From a62e41101a75e01f9b0dd7124eef81714443d8a1 Mon Sep 17 00:00:00 2001 From: Lucas Di Pentima Date: Fri, 9 Dec 2016 18:36:29 -0300 Subject: [PATCH] 10383: Refactored the file upload decision code so that it first skims through the entire list, instead of deciding one a file by file basis. This allows to pre-calculate how many bytes are going to be skipped when resuming, and that allows for a precise way of showing the upload progress report. Also updated affected tests so they pass with this new way of counting bytes. --- sdk/python/arvados/commands/put.py | 29 +++++++++++++++++++---------- sdk/python/tests/test_arv_put.py | 11 ++++++----- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 3e82bdf1da..8995ea95a2 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -341,6 +341,7 @@ class ArvPutUploadJob(object): self._stop_checkpointer = threading.Event() self._checkpointer = threading.Thread(target=self._update_task) self._update_task_time = update_time # How many seconds wait between update runs + self._files_to_upload = [] self.logger = logging.getLogger('arvados.arv_put') if not self.use_cache and self.resume: @@ -371,11 +372,16 @@ class ArvPutUploadJob(object): dirs.sort() files.sort() for f in files: - self._write_file(os.path.join(root, f), + self._check_file(os.path.join(root, f), os.path.join(root[len(prefixdir):], f)) else: - self._write_file(os.path.abspath(path), + self._check_file(os.path.abspath(path), self.filename or os.path.basename(path)) + # Update bytes_written from current local collection and + # report initial progress. + self._update() + # Actual file upload + self._upload_files() finally: # Stop the thread before doing anything else self._stop_checkpointer.set() @@ -387,8 +393,6 @@ class ArvPutUploadJob(object): self._cache_file.close() if save_collection: self.save_collection() - # Correct the final written bytes count - self.bytes_written -= self.bytes_skipped def save_collection(self): if self.update: @@ -468,11 +472,11 @@ class ArvPutUploadJob(object): self._write(sys.stdin, output) output.close() - def _write_file(self, source, filename): + def _check_file(self, source, filename): + """Check if this file needs to be uploaded""" resume_offset = 0 should_upload = False new_file_in_cache = False - # Record file path for updating the remote collection before exiting self._file_paths.append(filename) @@ -505,22 +509,31 @@ class ArvPutUploadJob(object): # Permission token expired, re-upload file. This will change whenever # we have a API for refreshing tokens. should_upload = True + self._local_collection.remove(filename) elif cached_file_data['size'] == file_in_local_collection.size(): # File already there, skip it. self.bytes_skipped += cached_file_data['size'] elif cached_file_data['size'] > file_in_local_collection.size(): # File partially uploaded, resume! resume_offset = file_in_local_collection.size() + self.bytes_skipped += resume_offset should_upload = True else: # Inconsistent cache, re-upload the file should_upload = True + self._local_collection.remove(filename) self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source)) # Local file differs from cached data, re-upload it. else: + if file_in_local_collection: + self._local_collection.remove(filename) should_upload = True if should_upload: + self._files_to_upload.append((source, resume_offset, filename)) + + def _upload_files(self): + for source, resume_offset, filename in self._files_to_upload: with open(source, 'r') as source_fd: with self._state_lock: self._state['files'][source]['mtime'] = os.path.getmtime(source) @@ -529,7 +542,6 @@ class ArvPutUploadJob(object): # Start upload where we left off output = self._local_collection.open(filename, 'a') source_fd.seek(resume_offset) - self.bytes_skipped += resume_offset else: # Start from scratch output = self._local_collection.open(filename, 'w') @@ -594,9 +606,6 @@ class ArvPutUploadJob(object): self._state = copy.deepcopy(self.EMPTY_STATE) # Load the previous manifest so we can check if files were modified remotely. self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired) - # Load how many bytes were uploaded on previous run - with self._collection_lock: - self.bytes_written = self._collection_size(self._local_collection) def _lock_file(self, fileobj): try: diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index fee32a3028..f42c0fc59e 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -278,12 +278,12 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, f.flush() cwriter = arv_put.ArvPutUploadJob([f.name]) cwriter.start(save_collection=False) - self.assertEqual(3, cwriter.bytes_written) + self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped) # Don't destroy the cache, and start another upload cwriter_new = arv_put.ArvPutUploadJob([f.name]) cwriter_new.start(save_collection=False) cwriter_new.destroy_cache() - self.assertEqual(0, cwriter_new.bytes_written) + self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped) def make_progress_tester(self): progression = [] @@ -324,13 +324,14 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, replication_desired=1) with self.assertRaises(SystemExit): writer.start(save_collection=False) - self.assertLess(writer.bytes_written, - os.path.getsize(self.large_file_name)) + self.assertGreater(writer.bytes_written, 0) + self.assertLess(writer.bytes_written, + os.path.getsize(self.large_file_name)) # Retry the upload writer2 = arv_put.ArvPutUploadJob([self.large_file_name], replication_desired=1) writer2.start(save_collection=False) - self.assertEqual(writer.bytes_written + writer2.bytes_written, + self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped, os.path.getsize(self.large_file_name)) writer2.destroy_cache() -- 2.30.2