11789: Unified the exclude logic by removing expected_bytes_for() and moving
authorLucas Di Pentima <lucas@curoverse.com>
Wed, 21 Jun 2017 15:12:18 +0000 (12:12 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Wed, 21 Jun 2017 15:12:18 +0000 (12:12 -0300)
the upload list code to a new method.
Updated tests to be in sync with this refactoring.

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <lucas@curoverse.com>

sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index ba7ff2bbd9f62af754bb59762d17342eb1484441..048e4125f30803d92f9913b31def6ed3746bf85d 100644 (file)
@@ -383,7 +383,7 @@ class ArvPutUploadJob(object):
     }
 
     def __init__(self, paths, resume=True, use_cache=True, reporter=None,
-                 bytes_expected=None, name=None, owner_uuid=None,
+                 name=None, owner_uuid=None,
                  ensure_unique_name=False, num_retries=None,
                  put_threads=None, replication_desired=None,
                  filename=None, update_time=60.0, update_collection=None,
@@ -394,7 +394,9 @@ class ArvPutUploadJob(object):
         self.use_cache = use_cache
         self.update = False
         self.reporter = reporter
-        self.bytes_expected = bytes_expected
+        # This will set to 0 before start counting, if no special files are going
+        # to be read.
+        self.bytes_expected = None
         self.bytes_written = 0
         self.bytes_skipped = 0
         self.name = name
@@ -435,72 +437,97 @@ class ArvPutUploadJob(object):
         # Load cached data if any and if needed
         self._setup_state(update_collection)
 
+        # Build the upload file list, excluding requested files and counting the
+        # bytes expected to be uploaded.
+        self._build_upload_list()
+
+    def _build_upload_list(self):
+        """
+        Scan the requested paths to count file sizes, excluding files & dirs if requested
+        and building the upload file list.
+        """
+        # If there aren't special files to be read, reset total bytes count to zero
+        # to start counting.
+        if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
+                          self.paths)):
+            self.bytes_expected = 0
+
+        for path in self.paths:
+            # Test for stdin first, in case some file named '-' exist
+            if path == '-':
+                if self.dry_run:
+                    raise ArvPutUploadIsPending()
+                self._write_stdin(self.filename or 'stdin')
+            elif not os.path.exists(path):
+                 raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+            elif os.path.isdir(path):
+                # Use absolute paths on cache index so CWD doesn't interfere
+                # with the caching logic.
+                orig_path = path
+                path = os.path.abspath(path)
+                if orig_path[-1:] == os.sep:
+                    # When passing a directory reference with a trailing slash,
+                    # its contents should be uploaded directly to the collection's root.
+                    prefixdir = path
+                else:
+                    # When passing a directory reference with no trailing slash,
+                    # upload the directory to the collection's root.
+                    prefixdir = os.path.dirname(path)
+                prefixdir += os.sep
+                for root, dirs, files in os.walk(path, followlinks=self.follow_links):
+                    root_relpath = os.path.relpath(root, path)
+                    # Exclude files/dirs by full path matching pattern
+                    if self.exclude_paths:
+                        dirs[:] = filter(
+                            lambda d: not any([pathname_match(os.path.join(root_relpath, d),
+                                                              pat)
+                                               for pat in self.exclude_paths]),
+                            dirs)
+                        files = filter(
+                            lambda f: not any([pathname_match(os.path.join(root_relpath, f),
+                                                              pat)
+                                               for pat in self.exclude_paths]),
+                            files)
+                    # Exclude files/dirs by name matching pattern
+                    if self.exclude_names is not None:
+                        dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
+                        files = filter(lambda f: not self.exclude_names.match(f), files)
+                    # Make os.walk()'s dir traversing order deterministic
+                    dirs.sort()
+                    files.sort()
+                    for f in files:
+                        filepath = os.path.join(root, f)
+                        # Add its size to the total bytes count (if applicable)
+                        if self.follow_links or (not os.path.islink(filepath)):
+                            if self.bytes_expected is not None:
+                                self.bytes_expected += os.path.getsize(filepath)
+                        self._check_file(filepath,
+                                         os.path.join(root[len(prefixdir):], f))
+            else:
+                filepath = os.path.abspath(path)
+                # Add its size to the total bytes count (if applicable)
+                if self.follow_links or (not os.path.islink(filepath)):
+                    if self.bytes_expected is not None:
+                        self.bytes_expected += os.path.getsize(filepath)
+                self._check_file(filepath,
+                                 self.filename or os.path.basename(path))
+        # If dry-mode is on, and got up to this point, then we should notify that
+        # there aren't any file to upload.
+        if self.dry_run:
+            raise ArvPutUploadNotPending()
+        # Remove local_collection's files that don't exist locally anymore, so the
+        # bytes_written count is correct.
+        for f in self.collection_file_paths(self._local_collection,
+                                            path_prefix=""):
+            if f != 'stdin' and f != self.filename and not f in self._file_paths:
+                self._local_collection.remove(f)
+
     def start(self, save_collection):
         """
         Start supporting thread & file uploading
         """
-        if not self.dry_run:
-            self._checkpointer.start()
+        self._checkpointer.start()
         try:
-            for path in self.paths:
-                # Test for stdin first, in case some file named '-' exist
-                if path == '-':
-                    if self.dry_run:
-                        raise ArvPutUploadIsPending()
-                    self._write_stdin(self.filename or 'stdin')
-                elif not os.path.exists(path):
-                     raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
-                elif os.path.isdir(path):
-                    # Use absolute paths on cache index so CWD doesn't interfere
-                    # with the caching logic.
-                    orig_path = path
-                    path = os.path.abspath(path)
-                    if orig_path[-1:] == os.sep:
-                        # When passing a directory reference with a trailing slash,
-                        # its contents should be uploaded directly to the collection's root.
-                        prefixdir = path
-                    else:
-                        # When passing a directory reference with no trailing slash,
-                        # upload the directory to the collection's root.
-                        prefixdir = os.path.dirname(path)
-                    prefixdir += os.sep
-                    for root, dirs, files in os.walk(path, followlinks=self.follow_links):
-                        root_relpath = os.path.relpath(root, path)
-                        # Exclude files/dirs by full path matching pattern
-                        if self.exclude_paths:
-                            dirs[:] = filter(
-                                lambda d: not any([pathname_match(os.path.join(root_relpath, d),
-                                                                  pat)
-                                                   for pat in self.exclude_paths]),
-                                dirs)
-                            files = filter(
-                                lambda f: not any([pathname_match(os.path.join(root_relpath, f),
-                                                                  pat)
-                                                   for pat in self.exclude_paths]),
-                                files)
-                        # Exclude files/dirs by name matching pattern
-                        if self.exclude_names is not None:
-                            dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
-                            files = filter(lambda f: not self.exclude_names.match(f), files)
-                        # Make os.walk()'s dir traversing order deterministic
-                        dirs.sort()
-                        files.sort()
-                        for f in files:
-                            self._check_file(os.path.join(root, f),
-                                             os.path.join(root[len(prefixdir):], f))
-                else:
-                    self._check_file(os.path.abspath(path),
-                                     self.filename or os.path.basename(path))
-            # If dry-mode is on, and got up to this point, then we should notify that
-            # there aren't any file to upload.
-            if self.dry_run:
-                raise ArvPutUploadNotPending()
-            # Remove local_collection's files that don't exist locally anymore, so the
-            # bytes_written count is correct.
-            for f in self.collection_file_paths(self._local_collection,
-                                                path_prefix=""):
-                if f != 'stdin' and f != self.filename and not f in self._file_paths:
-                    self._local_collection.remove(f)
             # Update bytes_written from current local collection and
             # report initial progress.
             self._update()
@@ -686,7 +713,13 @@ class ArvPutUploadJob(object):
             should_upload = True
 
         if should_upload:
-            self._files_to_upload.append((source, resume_offset, filename))
+            try:
+                self._files_to_upload.append((source, resume_offset, filename))
+            except ArvPutUploadIsPending:
+                # This could happen when running on dry-mode, close cache file to
+                # avoid locking issues.
+                self._cache_file.close()
+                raise
 
     def _upload_files(self):
         for source, resume_offset, filename in self._files_to_upload:
@@ -865,45 +898,6 @@ class ArvPutUploadJob(object):
             datablocks = self._datablocks_on_item(self._my_collection())
         return datablocks
 
-def expected_bytes_for(pathlist, follow_links=True, exclude={}):
-    # Walk the given directory trees and stat files, adding up file sizes,
-    # so we can display progress as percent
-    bytesum = 0
-    exclude_paths = exclude.get('paths', None)
-    exclude_names = exclude.get('names', None)
-    for path in pathlist:
-        if os.path.isdir(path):
-            for root, dirs, files in os.walk(path, followlinks=follow_links):
-                root_relpath = os.path.relpath(root, path)
-                # Exclude files/dirs by full path matching pattern
-                if exclude_paths is not None:
-                    dirs[:] = filter(
-                        lambda d: not any([pathname_match(os.path.join(root_relpath, d),
-                                                          pat)
-                                           for pat in exclude_paths]),
-                        dirs)
-                    files = filter(
-                        lambda f: not any([pathname_match(os.path.join(root_relpath, f),
-                                                          pat)
-                                           for pat in exclude_paths]),
-                        files)
-                # Exclude files/dirs by name matching pattern
-                if exclude_names is not None:
-                    dirs[:] = filter(lambda d: not exclude_names.match(d), dirs)
-                    files = filter(lambda f: not exclude_names.match(f), files)
-                # Sum file sizes
-                for f in files:
-                    filepath = os.path.join(root, f)
-                    # Ignore symlinked files when requested
-                    if (not follow_links) and os.path.islink(filepath):
-                        continue
-                    bytesum += os.path.getsize(filepath)
-        elif not os.path.isfile(path):
-            return None
-        else:
-            bytesum += os.path.getsize(path)
-    return bytesum
-
 _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
                                                             os.getpid())
 
@@ -1029,19 +1023,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     # uploaded, the expected bytes calculation can take a moment.
     if args.progress and any([os.path.isdir(f) for f in args.paths]):
         logger.info("Calculating upload size, this could take some time...")
-    bytes_expected = expected_bytes_for(args.paths,
-                                        follow_links=args.follow_links,
-                                        exclude={'paths': exclude_paths,
-                                                 'names': exclude_names})
-
-
     try:
         writer = ArvPutUploadJob(paths = args.paths,
                                  resume = args.resume,
                                  use_cache = args.use_cache,
                                  filename = args.filename,
                                  reporter = reporter,
-                                 bytes_expected = bytes_expected,
                                  num_retries = args.retries,
                                  replication_desired = args.replication,
                                  put_threads = args.threads,
@@ -1069,6 +1056,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
+    except PathDoesNotExistError as error:
+        logger.error("\n".join([
+            "arv-put: %s" % str(error)]))
+        sys.exit(1)
 
     # Install our signal handler for each code in CAUGHT_SIGNALS, and save
     # the originals.
@@ -1089,16 +1080,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
-    except ArvPutUploadIsPending:
-        # Dry run check successful, return proper exit code.
-        sys.exit(2)
-    except ArvPutUploadNotPending:
-        # No files pending for upload
-        sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if args.progress:  # Print newline to split stderr from stdout for humans.
         logger.info("\n")
index c2eaf12bbb048f24239839da2322fd0be69815c6..bfa39b861fdbf83162a86366840b5000e3f2e6ec 100644 (file)
@@ -299,9 +299,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
 
     def test_passing_nonexistant_path_raise_exception(self):
         uuid_str = str(uuid.uuid4())
-        cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
         with self.assertRaises(arv_put.PathDoesNotExistError):
-            cwriter.start(save_collection=False)
+            cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
@@ -336,7 +335,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             for expect_count in (None, 8):
                 progression, reporter = self.make_progress_tester()
                 cwriter = arv_put.ArvPutUploadJob([f.name],
-                    reporter=reporter, bytes_expected=expect_count)
+                                                  reporter=reporter)
+                cwriter.bytes_expected = expect_count
                 cwriter.start(save_collection=False)
                 cwriter.destroy_cache()
                 self.assertIn((3, expect_count), progression)
@@ -492,23 +492,20 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             self.assertGreater(writer.bytes_written, 0)
             self.assertLess(writer.bytes_written,
                             os.path.getsize(self.large_file_name))
-        # Retry the upload using dry_run to check if there is a pending upload
-        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
-                                          replication_desired=1,
-                                          dry_run=True)
         with self.assertRaises(arv_put.ArvPutUploadIsPending):
-            writer2.start(save_collection=False)
+            # Retry the upload using dry_run to check if there is a pending upload
+            writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                              replication_desired=1,
+                                              dry_run=True)
         # Complete the pending upload
         writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
                                           replication_desired=1)
         writer3.start(save_collection=False)
-        # Confirm there's no pending upload with dry_run=True
-        writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
-                                          replication_desired=1,
-                                          dry_run=True)
         with self.assertRaises(arv_put.ArvPutUploadNotPending):
-            writer4.start(save_collection=False)
-        writer4.destroy_cache()
+            # Confirm there's no pending upload with dry_run=True
+            writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                              replication_desired=1,
+                                              dry_run=True)
         # Test obvious cases
         with self.assertRaises(arv_put.ArvPutUploadIsPending):
             arv_put.ArvPutUploadJob([self.large_file_name],
@@ -527,21 +524,27 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
     TEST_SIZE = os.path.getsize(__file__)
 
     def test_expected_bytes_for_file(self):
+        writer = arv_put.ArvPutUploadJob([__file__])
         self.assertEqual(self.TEST_SIZE,
-                          arv_put.expected_bytes_for([__file__]))
+                         writer.bytes_expected)
 
     def test_expected_bytes_for_tree(self):
         tree = self.make_tmpdir()
         shutil.copyfile(__file__, os.path.join(tree, 'one'))
         shutil.copyfile(__file__, os.path.join(tree, 'two'))
+
+        writer = arv_put.ArvPutUploadJob([tree])
         self.assertEqual(self.TEST_SIZE * 2,
-                          arv_put.expected_bytes_for([tree]))
+                         writer.bytes_expected)
+        writer = arv_put.ArvPutUploadJob([tree, __file__])
         self.assertEqual(self.TEST_SIZE * 3,
-                          arv_put.expected_bytes_for([tree, __file__]))
+                         writer.bytes_expected)
 
     def test_expected_bytes_for_device(self):
-        self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
-        self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+        writer = arv_put.ArvPutUploadJob(['/dev/null'])
+        self.assertIsNone(writer.bytes_expected)
+        writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
+        self.assertIsNone(writer.bytes_expected)
 
 
 class ArvadosPutReportTest(ArvadosBaseTestCase):