From: Lucas Di Pentima Date: Wed, 21 Jun 2017 15:12:18 +0000 (-0300) Subject: 11789: Unified the exclude logic by removing expected_bytes_for() and moving X-Git-Tag: 1.1.0~166^2~7 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/c2ceb956b2c9a7d5d2e14925cd1b68e332b36ad6 11789: Unified the exclude logic by removing expected_bytes_for() and moving the upload list code to a new method. Updated tests to be in sync with this refactoring. Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima --- diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index ba7ff2bbd9..048e4125f3 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -383,7 +383,7 @@ class ArvPutUploadJob(object): } def __init__(self, paths, resume=True, use_cache=True, reporter=None, - bytes_expected=None, name=None, owner_uuid=None, + name=None, owner_uuid=None, ensure_unique_name=False, num_retries=None, put_threads=None, replication_desired=None, filename=None, update_time=60.0, update_collection=None, @@ -394,7 +394,9 @@ class ArvPutUploadJob(object): self.use_cache = use_cache self.update = False self.reporter = reporter - self.bytes_expected = bytes_expected + # This will set to 0 before start counting, if no special files are going + # to be read. + self.bytes_expected = None self.bytes_written = 0 self.bytes_skipped = 0 self.name = name @@ -435,72 +437,97 @@ class ArvPutUploadJob(object): # Load cached data if any and if needed self._setup_state(update_collection) + # Build the upload file list, excluding requested files and counting the + # bytes expected to be uploaded. + self._build_upload_list() + + def _build_upload_list(self): + """ + Scan the requested paths to count file sizes, excluding files & dirs if requested + and building the upload file list. + """ + # If there aren't special files to be read, reset total bytes count to zero + # to start counting. + if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)), + self.paths)): + self.bytes_expected = 0 + + for path in self.paths: + # Test for stdin first, in case some file named '-' exist + if path == '-': + if self.dry_run: + raise ArvPutUploadIsPending() + self._write_stdin(self.filename or 'stdin') + elif not os.path.exists(path): + raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path)) + elif os.path.isdir(path): + # Use absolute paths on cache index so CWD doesn't interfere + # with the caching logic. + orig_path = path + path = os.path.abspath(path) + if orig_path[-1:] == os.sep: + # When passing a directory reference with a trailing slash, + # its contents should be uploaded directly to the collection's root. + prefixdir = path + else: + # When passing a directory reference with no trailing slash, + # upload the directory to the collection's root. + prefixdir = os.path.dirname(path) + prefixdir += os.sep + for root, dirs, files in os.walk(path, followlinks=self.follow_links): + root_relpath = os.path.relpath(root, path) + # Exclude files/dirs by full path matching pattern + if self.exclude_paths: + dirs[:] = filter( + lambda d: not any([pathname_match(os.path.join(root_relpath, d), + pat) + for pat in self.exclude_paths]), + dirs) + files = filter( + lambda f: not any([pathname_match(os.path.join(root_relpath, f), + pat) + for pat in self.exclude_paths]), + files) + # Exclude files/dirs by name matching pattern + if self.exclude_names is not None: + dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs) + files = filter(lambda f: not self.exclude_names.match(f), files) + # Make os.walk()'s dir traversing order deterministic + dirs.sort() + files.sort() + for f in files: + filepath = os.path.join(root, f) + # Add its size to the total bytes count (if applicable) + if self.follow_links or (not os.path.islink(filepath)): + if self.bytes_expected is not None: + self.bytes_expected += os.path.getsize(filepath) + self._check_file(filepath, + os.path.join(root[len(prefixdir):], f)) + else: + filepath = os.path.abspath(path) + # Add its size to the total bytes count (if applicable) + if self.follow_links or (not os.path.islink(filepath)): + if self.bytes_expected is not None: + self.bytes_expected += os.path.getsize(filepath) + self._check_file(filepath, + self.filename or os.path.basename(path)) + # If dry-mode is on, and got up to this point, then we should notify that + # there aren't any file to upload. + if self.dry_run: + raise ArvPutUploadNotPending() + # Remove local_collection's files that don't exist locally anymore, so the + # bytes_written count is correct. + for f in self.collection_file_paths(self._local_collection, + path_prefix=""): + if f != 'stdin' and f != self.filename and not f in self._file_paths: + self._local_collection.remove(f) + def start(self, save_collection): """ Start supporting thread & file uploading """ - if not self.dry_run: - self._checkpointer.start() + self._checkpointer.start() try: - for path in self.paths: - # Test for stdin first, in case some file named '-' exist - if path == '-': - if self.dry_run: - raise ArvPutUploadIsPending() - self._write_stdin(self.filename or 'stdin') - elif not os.path.exists(path): - raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path)) - elif os.path.isdir(path): - # Use absolute paths on cache index so CWD doesn't interfere - # with the caching logic. - orig_path = path - path = os.path.abspath(path) - if orig_path[-1:] == os.sep: - # When passing a directory reference with a trailing slash, - # its contents should be uploaded directly to the collection's root. - prefixdir = path - else: - # When passing a directory reference with no trailing slash, - # upload the directory to the collection's root. - prefixdir = os.path.dirname(path) - prefixdir += os.sep - for root, dirs, files in os.walk(path, followlinks=self.follow_links): - root_relpath = os.path.relpath(root, path) - # Exclude files/dirs by full path matching pattern - if self.exclude_paths: - dirs[:] = filter( - lambda d: not any([pathname_match(os.path.join(root_relpath, d), - pat) - for pat in self.exclude_paths]), - dirs) - files = filter( - lambda f: not any([pathname_match(os.path.join(root_relpath, f), - pat) - for pat in self.exclude_paths]), - files) - # Exclude files/dirs by name matching pattern - if self.exclude_names is not None: - dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs) - files = filter(lambda f: not self.exclude_names.match(f), files) - # Make os.walk()'s dir traversing order deterministic - dirs.sort() - files.sort() - for f in files: - self._check_file(os.path.join(root, f), - os.path.join(root[len(prefixdir):], f)) - else: - self._check_file(os.path.abspath(path), - self.filename or os.path.basename(path)) - # If dry-mode is on, and got up to this point, then we should notify that - # there aren't any file to upload. - if self.dry_run: - raise ArvPutUploadNotPending() - # Remove local_collection's files that don't exist locally anymore, so the - # bytes_written count is correct. - for f in self.collection_file_paths(self._local_collection, - path_prefix=""): - if f != 'stdin' and f != self.filename and not f in self._file_paths: - self._local_collection.remove(f) # Update bytes_written from current local collection and # report initial progress. self._update() @@ -686,7 +713,13 @@ class ArvPutUploadJob(object): should_upload = True if should_upload: - self._files_to_upload.append((source, resume_offset, filename)) + try: + self._files_to_upload.append((source, resume_offset, filename)) + except ArvPutUploadIsPending: + # This could happen when running on dry-mode, close cache file to + # avoid locking issues. + self._cache_file.close() + raise def _upload_files(self): for source, resume_offset, filename in self._files_to_upload: @@ -865,45 +898,6 @@ class ArvPutUploadJob(object): datablocks = self._datablocks_on_item(self._my_collection()) return datablocks -def expected_bytes_for(pathlist, follow_links=True, exclude={}): - # Walk the given directory trees and stat files, adding up file sizes, - # so we can display progress as percent - bytesum = 0 - exclude_paths = exclude.get('paths', None) - exclude_names = exclude.get('names', None) - for path in pathlist: - if os.path.isdir(path): - for root, dirs, files in os.walk(path, followlinks=follow_links): - root_relpath = os.path.relpath(root, path) - # Exclude files/dirs by full path matching pattern - if exclude_paths is not None: - dirs[:] = filter( - lambda d: not any([pathname_match(os.path.join(root_relpath, d), - pat) - for pat in exclude_paths]), - dirs) - files = filter( - lambda f: not any([pathname_match(os.path.join(root_relpath, f), - pat) - for pat in exclude_paths]), - files) - # Exclude files/dirs by name matching pattern - if exclude_names is not None: - dirs[:] = filter(lambda d: not exclude_names.match(d), dirs) - files = filter(lambda f: not exclude_names.match(f), files) - # Sum file sizes - for f in files: - filepath = os.path.join(root, f) - # Ignore symlinked files when requested - if (not follow_links) and os.path.islink(filepath): - continue - bytesum += os.path.getsize(filepath) - elif not os.path.isfile(path): - return None - else: - bytesum += os.path.getsize(path) - return bytesum - _machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0], os.getpid()) @@ -1029,19 +1023,12 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): # uploaded, the expected bytes calculation can take a moment. if args.progress and any([os.path.isdir(f) for f in args.paths]): logger.info("Calculating upload size, this could take some time...") - bytes_expected = expected_bytes_for(args.paths, - follow_links=args.follow_links, - exclude={'paths': exclude_paths, - 'names': exclude_names}) - - try: writer = ArvPutUploadJob(paths = args.paths, resume = args.resume, use_cache = args.use_cache, filename = args.filename, reporter = reporter, - bytes_expected = bytes_expected, num_retries = args.retries, replication_desired = args.replication, put_threads = args.threads, @@ -1069,6 +1056,10 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): except ArvPutUploadNotPending: # No files pending for upload sys.exit(0) + except PathDoesNotExistError as error: + logger.error("\n".join([ + "arv-put: %s" % str(error)])) + sys.exit(1) # Install our signal handler for each code in CAUGHT_SIGNALS, and save # the originals. @@ -1089,16 +1080,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr): logger.error("\n".join([ "arv-put: %s" % str(error)])) sys.exit(1) - except ArvPutUploadIsPending: - # Dry run check successful, return proper exit code. - sys.exit(2) - except ArvPutUploadNotPending: - # No files pending for upload - sys.exit(0) - except PathDoesNotExistError as error: - logger.error("\n".join([ - "arv-put: %s" % str(error)])) - sys.exit(1) if args.progress: # Print newline to split stderr from stdout for humans. logger.info("\n") diff --git a/sdk/python/tests/test_arv_put.py b/sdk/python/tests/test_arv_put.py index c2eaf12bbb..bfa39b861f 100644 --- a/sdk/python/tests/test_arv_put.py +++ b/sdk/python/tests/test_arv_put.py @@ -299,9 +299,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, def test_passing_nonexistant_path_raise_exception(self): uuid_str = str(uuid.uuid4()) - cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)]) with self.assertRaises(arv_put.PathDoesNotExistError): - cwriter.start(save_collection=False) + cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)]) def test_writer_works_without_cache(self): cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False) @@ -336,7 +335,8 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, for expect_count in (None, 8): progression, reporter = self.make_progress_tester() cwriter = arv_put.ArvPutUploadJob([f.name], - reporter=reporter, bytes_expected=expect_count) + reporter=reporter) + cwriter.bytes_expected = expect_count cwriter.start(save_collection=False) cwriter.destroy_cache() self.assertIn((3, expect_count), progression) @@ -492,23 +492,20 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers, self.assertGreater(writer.bytes_written, 0) self.assertLess(writer.bytes_written, os.path.getsize(self.large_file_name)) - # Retry the upload using dry_run to check if there is a pending upload - writer2 = arv_put.ArvPutUploadJob([self.large_file_name], - replication_desired=1, - dry_run=True) with self.assertRaises(arv_put.ArvPutUploadIsPending): - writer2.start(save_collection=False) + # Retry the upload using dry_run to check if there is a pending upload + writer2 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) # Complete the pending upload writer3 = arv_put.ArvPutUploadJob([self.large_file_name], replication_desired=1) writer3.start(save_collection=False) - # Confirm there's no pending upload with dry_run=True - writer4 = arv_put.ArvPutUploadJob([self.large_file_name], - replication_desired=1, - dry_run=True) with self.assertRaises(arv_put.ArvPutUploadNotPending): - writer4.start(save_collection=False) - writer4.destroy_cache() + # Confirm there's no pending upload with dry_run=True + writer4 = arv_put.ArvPutUploadJob([self.large_file_name], + replication_desired=1, + dry_run=True) # Test obvious cases with self.assertRaises(arv_put.ArvPutUploadIsPending): arv_put.ArvPutUploadJob([self.large_file_name], @@ -527,21 +524,27 @@ class ArvadosExpectedBytesTest(ArvadosBaseTestCase): TEST_SIZE = os.path.getsize(__file__) def test_expected_bytes_for_file(self): + writer = arv_put.ArvPutUploadJob([__file__]) self.assertEqual(self.TEST_SIZE, - arv_put.expected_bytes_for([__file__])) + writer.bytes_expected) def test_expected_bytes_for_tree(self): tree = self.make_tmpdir() shutil.copyfile(__file__, os.path.join(tree, 'one')) shutil.copyfile(__file__, os.path.join(tree, 'two')) + + writer = arv_put.ArvPutUploadJob([tree]) self.assertEqual(self.TEST_SIZE * 2, - arv_put.expected_bytes_for([tree])) + writer.bytes_expected) + writer = arv_put.ArvPutUploadJob([tree, __file__]) self.assertEqual(self.TEST_SIZE * 3, - arv_put.expected_bytes_for([tree, __file__])) + writer.bytes_expected) def test_expected_bytes_for_device(self): - self.assertIsNone(arv_put.expected_bytes_for(['/dev/null'])) - self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null'])) + writer = arv_put.ArvPutUploadJob(['/dev/null']) + self.assertIsNone(writer.bytes_expected) + writer = arv_put.ArvPutUploadJob([__file__, '/dev/null']) + self.assertIsNone(writer.bytes_expected) class ArvadosPutReportTest(ArvadosBaseTestCase):