10383: Refactored the file upload decision code so that it first skims through
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 21:36:29 +0000 (18:36 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 9 Dec 2016 21:36:29 +0000 (18:36 -0300)
the entire list, instead of deciding one a file by file basis. This allows to
pre-calculate how many bytes are going to be skipped when resuming, and that
allows for a precise way of showing the upload progress report.
Also updated affected tests so they pass with this new way of counting bytes.

sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index 3e82bdf1dad4b661638e2a4343ce83bcfb4dd976..8995ea95a29870e3d65aad045b74b0df4c39feb9 100644 (file)
@@ -341,6 +341,7 @@ class ArvPutUploadJob(object):
         self._stop_checkpointer = threading.Event()
         self._checkpointer = threading.Thread(target=self._update_task)
         self._update_task_time = update_time  # How many seconds wait between update runs
+        self._files_to_upload = []
         self.logger = logging.getLogger('arvados.arv_put')
 
         if not self.use_cache and self.resume:
@@ -371,11 +372,16 @@ class ArvPutUploadJob(object):
                         dirs.sort()
                         files.sort()
                         for f in files:
-                            self._write_file(os.path.join(root, f),
+                            self._check_file(os.path.join(root, f),
                                              os.path.join(root[len(prefixdir):], f))
                 else:
-                    self._write_file(os.path.abspath(path),
+                    self._check_file(os.path.abspath(path),
                                      self.filename or os.path.basename(path))
+            # Update bytes_written from current local collection and
+            # report initial progress.
+            self._update()
+            # Actual file upload
+            self._upload_files()
         finally:
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
@@ -387,8 +393,6 @@ class ArvPutUploadJob(object):
                 self._cache_file.close()
             if save_collection:
                 self.save_collection()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         if self.update:
@@ -468,11 +472,11 @@ class ArvPutUploadJob(object):
         self._write(sys.stdin, output)
         output.close()
 
-    def _write_file(self, source, filename):
+    def _check_file(self, source, filename):
+        """Check if this file needs to be uploaded"""
         resume_offset = 0
         should_upload = False
         new_file_in_cache = False
-
         # Record file path for updating the remote collection before exiting
         self._file_paths.append(filename)
 
@@ -505,22 +509,31 @@ class ArvPutUploadJob(object):
                 # Permission token expired, re-upload file. This will change whenever
                 # we have a API for refreshing tokens.
                 should_upload = True
+                self._local_collection.remove(filename)
             elif cached_file_data['size'] == file_in_local_collection.size():
                 # File already there, skip it.
                 self.bytes_skipped += cached_file_data['size']
             elif cached_file_data['size'] > file_in_local_collection.size():
                 # File partially uploaded, resume!
                 resume_offset = file_in_local_collection.size()
+                self.bytes_skipped += resume_offset
                 should_upload = True
             else:
                 # Inconsistent cache, re-upload the file
                 should_upload = True
+                self._local_collection.remove(filename)
                 self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
         # Local file differs from cached data, re-upload it.
         else:
+            if file_in_local_collection:
+                self._local_collection.remove(filename)
             should_upload = True
 
         if should_upload:
+            self._files_to_upload.append((source, resume_offset, filename))
+
+    def _upload_files(self):
+        for source, resume_offset, filename in self._files_to_upload:
             with open(source, 'r') as source_fd:
                 with self._state_lock:
                     self._state['files'][source]['mtime'] = os.path.getmtime(source)
@@ -529,7 +542,6 @@ class ArvPutUploadJob(object):
                     # Start upload where we left off
                     output = self._local_collection.open(filename, 'a')
                     source_fd.seek(resume_offset)
-                    self.bytes_skipped += resume_offset
                 else:
                     # Start from scratch
                     output = self._local_collection.open(filename, 'w')
@@ -594,9 +606,6 @@ class ArvPutUploadJob(object):
                 self._state = copy.deepcopy(self.EMPTY_STATE)
             # Load the previous manifest so we can check if files were modified remotely.
             self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired)
-        # Load how many bytes were uploaded on previous run
-        with self._collection_lock:
-            self.bytes_written = self._collection_size(self._local_collection)
 
     def _lock_file(self, fileobj):
         try:
index fee32a30283056edd2196cfdd308083cd700659a..f42c0fc59ebf495b5560cc6b9eb7bc4cbfacf84f 100644 (file)
@@ -278,12 +278,12 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             f.flush()
             cwriter = arv_put.ArvPutUploadJob([f.name])
             cwriter.start(save_collection=False)
-            self.assertEqual(3, cwriter.bytes_written)
+            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
             # Don't destroy the cache, and start another upload
             cwriter_new = arv_put.ArvPutUploadJob([f.name])
             cwriter_new.start(save_collection=False)
             cwriter_new.destroy_cache()
-            self.assertEqual(0, cwriter_new.bytes_written)
+            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
 
     def make_progress_tester(self):
         progression = []
@@ -324,13 +324,14 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                                              replication_desired=1)
             with self.assertRaises(SystemExit):
                 writer.start(save_collection=False)
-                self.assertLess(writer.bytes_written,
-                                os.path.getsize(self.large_file_name))
+            self.assertGreater(writer.bytes_written, 0)
+            self.assertLess(writer.bytes_written,
+                            os.path.getsize(self.large_file_name))
         # Retry the upload
         writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
         writer2.start(save_collection=False)
-        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
                          os.path.getsize(self.large_file_name))
         writer2.destroy_cache()