}
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
- bytes_expected=None, name=None, owner_uuid=None,
+ name=None, owner_uuid=None,
ensure_unique_name=False, num_retries=None,
put_threads=None, replication_desired=None,
filename=None, update_time=60.0, update_collection=None,
self.use_cache = use_cache
self.update = False
self.reporter = reporter
- self.bytes_expected = bytes_expected
+ # This will set to 0 before start counting, if no special files are going
+ # to be read.
+ self.bytes_expected = None
self.bytes_written = 0
self.bytes_skipped = 0
self.name = name
# Load cached data if any and if needed
self._setup_state(update_collection)
+ # Build the upload file list, excluding requested files and counting the
+ # bytes expected to be uploaded.
+ self._build_upload_list()
+
+ def _build_upload_list(self):
+ """
+ Scan the requested paths to count file sizes, excluding files & dirs if requested
+ and building the upload file list.
+ """
+ # If there aren't special files to be read, reset total bytes count to zero
+ # to start counting.
+ if not any(filter(lambda p: not (os.path.isfile(p) or os.path.isdir(p)),
+ self.paths)):
+ self.bytes_expected = 0
+
+ for path in self.paths:
+ # Test for stdin first, in case some file named '-' exist
+ if path == '-':
+ if self.dry_run:
+ raise ArvPutUploadIsPending()
+ self._write_stdin(self.filename or 'stdin')
+ elif not os.path.exists(path):
+ raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+ elif os.path.isdir(path):
+ # Use absolute paths on cache index so CWD doesn't interfere
+ # with the caching logic.
+ orig_path = path
+ path = os.path.abspath(path)
+ if orig_path[-1:] == os.sep:
+ # When passing a directory reference with a trailing slash,
+ # its contents should be uploaded directly to the collection's root.
+ prefixdir = path
+ else:
+ # When passing a directory reference with no trailing slash,
+ # upload the directory to the collection's root.
+ prefixdir = os.path.dirname(path)
+ prefixdir += os.sep
+ for root, dirs, files in os.walk(path, followlinks=self.follow_links):
+ root_relpath = os.path.relpath(root, path)
+ # Exclude files/dirs by full path matching pattern
+ if self.exclude_paths:
+ dirs[:] = filter(
+ lambda d: not any([pathname_match(os.path.join(root_relpath, d),
+ pat)
+ for pat in self.exclude_paths]),
+ dirs)
+ files = filter(
+ lambda f: not any([pathname_match(os.path.join(root_relpath, f),
+ pat)
+ for pat in self.exclude_paths]),
+ files)
+ # Exclude files/dirs by name matching pattern
+ if self.exclude_names is not None:
+ dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
+ files = filter(lambda f: not self.exclude_names.match(f), files)
+ # Make os.walk()'s dir traversing order deterministic
+ dirs.sort()
+ files.sort()
+ for f in files:
+ filepath = os.path.join(root, f)
+ # Add its size to the total bytes count (if applicable)
+ if self.follow_links or (not os.path.islink(filepath)):
+ if self.bytes_expected is not None:
+ self.bytes_expected += os.path.getsize(filepath)
+ self._check_file(filepath,
+ os.path.join(root[len(prefixdir):], f))
+ else:
+ filepath = os.path.abspath(path)
+ # Add its size to the total bytes count (if applicable)
+ if self.follow_links or (not os.path.islink(filepath)):
+ if self.bytes_expected is not None:
+ self.bytes_expected += os.path.getsize(filepath)
+ self._check_file(filepath,
+ self.filename or os.path.basename(path))
+ # If dry-mode is on, and got up to this point, then we should notify that
+ # there aren't any file to upload.
+ if self.dry_run:
+ raise ArvPutUploadNotPending()
+ # Remove local_collection's files that don't exist locally anymore, so the
+ # bytes_written count is correct.
+ for f in self.collection_file_paths(self._local_collection,
+ path_prefix=""):
+ if f != 'stdin' and f != self.filename and not f in self._file_paths:
+ self._local_collection.remove(f)
+
def start(self, save_collection):
"""
Start supporting thread & file uploading
"""
- if not self.dry_run:
- self._checkpointer.start()
+ self._checkpointer.start()
try:
- for path in self.paths:
- # Test for stdin first, in case some file named '-' exist
- if path == '-':
- if self.dry_run:
- raise ArvPutUploadIsPending()
- self._write_stdin(self.filename or 'stdin')
- elif not os.path.exists(path):
- raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
- elif os.path.isdir(path):
- # Use absolute paths on cache index so CWD doesn't interfere
- # with the caching logic.
- orig_path = path
- path = os.path.abspath(path)
- if orig_path[-1:] == os.sep:
- # When passing a directory reference with a trailing slash,
- # its contents should be uploaded directly to the collection's root.
- prefixdir = path
- else:
- # When passing a directory reference with no trailing slash,
- # upload the directory to the collection's root.
- prefixdir = os.path.dirname(path)
- prefixdir += os.sep
- for root, dirs, files in os.walk(path, followlinks=self.follow_links):
- root_relpath = os.path.relpath(root, path)
- # Exclude files/dirs by full path matching pattern
- if self.exclude_paths:
- dirs[:] = filter(
- lambda d: not any([pathname_match(os.path.join(root_relpath, d),
- pat)
- for pat in self.exclude_paths]),
- dirs)
- files = filter(
- lambda f: not any([pathname_match(os.path.join(root_relpath, f),
- pat)
- for pat in self.exclude_paths]),
- files)
- # Exclude files/dirs by name matching pattern
- if self.exclude_names is not None:
- dirs[:] = filter(lambda d: not self.exclude_names.match(d), dirs)
- files = filter(lambda f: not self.exclude_names.match(f), files)
- # Make os.walk()'s dir traversing order deterministic
- dirs.sort()
- files.sort()
- for f in files:
- self._check_file(os.path.join(root, f),
- os.path.join(root[len(prefixdir):], f))
- else:
- self._check_file(os.path.abspath(path),
- self.filename or os.path.basename(path))
- # If dry-mode is on, and got up to this point, then we should notify that
- # there aren't any file to upload.
- if self.dry_run:
- raise ArvPutUploadNotPending()
- # Remove local_collection's files that don't exist locally anymore, so the
- # bytes_written count is correct.
- for f in self.collection_file_paths(self._local_collection,
- path_prefix=""):
- if f != 'stdin' and f != self.filename and not f in self._file_paths:
- self._local_collection.remove(f)
# Update bytes_written from current local collection and
# report initial progress.
self._update()
should_upload = True
if should_upload:
- self._files_to_upload.append((source, resume_offset, filename))
+ try:
+ self._files_to_upload.append((source, resume_offset, filename))
+ except ArvPutUploadIsPending:
+ # This could happen when running on dry-mode, close cache file to
+ # avoid locking issues.
+ self._cache_file.close()
+ raise
def _upload_files(self):
for source, resume_offset, filename in self._files_to_upload:
datablocks = self._datablocks_on_item(self._my_collection())
return datablocks
-def expected_bytes_for(pathlist, follow_links=True, exclude={}):
- # Walk the given directory trees and stat files, adding up file sizes,
- # so we can display progress as percent
- bytesum = 0
- exclude_paths = exclude.get('paths', None)
- exclude_names = exclude.get('names', None)
- for path in pathlist:
- if os.path.isdir(path):
- for root, dirs, files in os.walk(path, followlinks=follow_links):
- root_relpath = os.path.relpath(root, path)
- # Exclude files/dirs by full path matching pattern
- if exclude_paths is not None:
- dirs[:] = filter(
- lambda d: not any([pathname_match(os.path.join(root_relpath, d),
- pat)
- for pat in exclude_paths]),
- dirs)
- files = filter(
- lambda f: not any([pathname_match(os.path.join(root_relpath, f),
- pat)
- for pat in exclude_paths]),
- files)
- # Exclude files/dirs by name matching pattern
- if exclude_names is not None:
- dirs[:] = filter(lambda d: not exclude_names.match(d), dirs)
- files = filter(lambda f: not exclude_names.match(f), files)
- # Sum file sizes
- for f in files:
- filepath = os.path.join(root, f)
- # Ignore symlinked files when requested
- if (not follow_links) and os.path.islink(filepath):
- continue
- bytesum += os.path.getsize(filepath)
- elif not os.path.isfile(path):
- return None
- else:
- bytesum += os.path.getsize(path)
- return bytesum
-
_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
os.getpid())
# uploaded, the expected bytes calculation can take a moment.
if args.progress and any([os.path.isdir(f) for f in args.paths]):
logger.info("Calculating upload size, this could take some time...")
- bytes_expected = expected_bytes_for(args.paths,
- follow_links=args.follow_links,
- exclude={'paths': exclude_paths,
- 'names': exclude_names})
-
-
try:
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
use_cache = args.use_cache,
filename = args.filename,
reporter = reporter,
- bytes_expected = bytes_expected,
num_retries = args.retries,
replication_desired = args.replication,
put_threads = args.threads,
except ArvPutUploadNotPending:
# No files pending for upload
sys.exit(0)
+ except PathDoesNotExistError as error:
+ logger.error("\n".join([
+ "arv-put: %s" % str(error)]))
+ sys.exit(1)
# Install our signal handler for each code in CAUGHT_SIGNALS, and save
# the originals.
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
- except ArvPutUploadIsPending:
- # Dry run check successful, return proper exit code.
- sys.exit(2)
- except ArvPutUploadNotPending:
- # No files pending for upload
- sys.exit(0)
- except PathDoesNotExistError as error:
- logger.error("\n".join([
- "arv-put: %s" % str(error)]))
- sys.exit(1)
if args.progress: # Print newline to split stderr from stdout for humans.
logger.info("\n")
def test_passing_nonexistant_path_raise_exception(self):
uuid_str = str(uuid.uuid4())
- cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
with self.assertRaises(arv_put.PathDoesNotExistError):
- cwriter.start(save_collection=False)
+ cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
for expect_count in (None, 8):
progression, reporter = self.make_progress_tester()
cwriter = arv_put.ArvPutUploadJob([f.name],
- reporter=reporter, bytes_expected=expect_count)
+ reporter=reporter)
+ cwriter.bytes_expected = expect_count
cwriter.start(save_collection=False)
cwriter.destroy_cache()
self.assertIn((3, expect_count), progression)
self.assertGreater(writer.bytes_written, 0)
self.assertLess(writer.bytes_written,
os.path.getsize(self.large_file_name))
- # Retry the upload using dry_run to check if there is a pending upload
- writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
- replication_desired=1,
- dry_run=True)
with self.assertRaises(arv_put.ArvPutUploadIsPending):
- writer2.start(save_collection=False)
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
# Complete the pending upload
writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
replication_desired=1)
writer3.start(save_collection=False)
- # Confirm there's no pending upload with dry_run=True
- writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
- replication_desired=1,
- dry_run=True)
with self.assertRaises(arv_put.ArvPutUploadNotPending):
- writer4.start(save_collection=False)
- writer4.destroy_cache()
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
# Test obvious cases
with self.assertRaises(arv_put.ArvPutUploadIsPending):
arv_put.ArvPutUploadJob([self.large_file_name],
TEST_SIZE = os.path.getsize(__file__)
def test_expected_bytes_for_file(self):
+ writer = arv_put.ArvPutUploadJob([__file__])
self.assertEqual(self.TEST_SIZE,
- arv_put.expected_bytes_for([__file__]))
+ writer.bytes_expected)
def test_expected_bytes_for_tree(self):
tree = self.make_tmpdir()
shutil.copyfile(__file__, os.path.join(tree, 'one'))
shutil.copyfile(__file__, os.path.join(tree, 'two'))
+
+ writer = arv_put.ArvPutUploadJob([tree])
self.assertEqual(self.TEST_SIZE * 2,
- arv_put.expected_bytes_for([tree]))
+ writer.bytes_expected)
+ writer = arv_put.ArvPutUploadJob([tree, __file__])
self.assertEqual(self.TEST_SIZE * 3,
- arv_put.expected_bytes_for([tree, __file__]))
+ writer.bytes_expected)
def test_expected_bytes_for_device(self):
- self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
- self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
+ writer = arv_put.ArvPutUploadJob(['/dev/null'])
+ self.assertIsNone(writer.bytes_expected)
+ writer = arv_put.ArvPutUploadJob([__file__, '/dev/null'])
+ self.assertIsNone(writer.bytes_expected)
class ArvadosPutReportTest(ArvadosBaseTestCase):