_group.add_argument('--stream', action='store_true',
help="""
Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
""")
_group.add_argument('--as-manifest', action='store_true', dest='manifest',
""")
_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
help="""
-Do not follow file and directory symlinks.
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
""")
still be displayed.)
""")
+run_opts.add_argument('--batch', action='store_true', default=False,
+ help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--resume', action='store_true', default=True,
help="""
args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
- if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
- if args.filename:
- arg_parser.error("""
+ if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+ arg_parser.error("""
--filename argument cannot be used when storing a directory or
multiple files.
""")
}
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
- name=None, owner_uuid=None, api_client=None,
+ name=None, owner_uuid=None, api_client=None, batch_mode=False,
ensure_unique_name=False, num_retries=None,
put_threads=None, replication_desired=None, filename=None,
update_time=60.0, update_collection=None, storage_classes=None,
self.paths = paths
self.resume = resume
self.use_cache = use_cache
+ self.batch_mode = batch_mode
self.update = False
self.reporter = reporter
# This will set to 0 before start counting, if no special files are going
self._write_stdin(self.filename or 'stdin')
elif not os.path.exists(path):
raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+ elif (not self.follow_links) and os.path.islink(path):
+ self.logger.warning("Skipping symlink '{}'".format(path))
+ continue
elif os.path.isdir(path):
# Use absolute paths on cache index so CWD doesn't interfere
# with the caching logic.
files.sort()
for f in files:
filepath = os.path.join(root, f)
+ if not os.path.isfile(filepath):
+ self.logger.warning("Skipping non-regular file '{}'".format(filepath))
+ continue
# Add its size to the total bytes count (if applicable)
if self.follow_links or (not os.path.islink(filepath)):
if self.bytes_expected is not None:
else:
# The file already exist on remote collection, skip it.
pass
- self._remote_collection.save(storage_classes=self.storage_classes,
- num_retries=self.num_retries,
+ self._remote_collection.save(num_retries=self.num_retries,
trash_at=self._collection_trash_at())
else:
- if self.storage_classes is None:
- self.storage_classes = ['default']
+ if len(self._local_collection) == 0:
+ self.logger.warning("No files were uploaded, skipping collection creation.")
+ return
self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
- storage_classes=self.storage_classes,
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries,
trash_at=self._collection_trash_at())
def _write_stdin(self, filename):
output = self._local_collection.open(filename, 'wb')
- self._write(sys.stdin, output)
+ self._write(sys.stdin.buffer, output)
output.close()
def _check_file(self, source, filename):
update_collection):
try:
self._remote_collection = arvados.collection.Collection(
- update_collection, api_client=self._api_client)
+ update_collection,
+ api_client=self._api_client,
+ storage_classes_desired=self.storage_classes,
+ num_retries=self.num_retries)
except arvados.errors.ApiError as error:
raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
else:
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
if not self._cached_manifest_valid():
- raise ResumeCacheInvalidError()
+ if not self.batch_mode:
+ raise ResumeCacheInvalidError()
+ else:
+ self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+ self.use_cache = False # Don't overwrite preexisting cache file.
+ self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
replication_desired=self.replication_desired,
+ storage_classes_desired=self.storage_classes,
put_threads=self.put_threads,
- api_client=self._api_client)
+ api_client=self._api_client,
+ num_retries=self.num_retries)
def _cached_manifest_valid(self):
"""
logging.getLogger('arvados').handlers[0].setFormatter(formatter)
if api_client is None:
- api_client = arvados.api('v1', request_id=request_id)
+ api_client = arvados.api('v1', request_id=request_id, num_retries=args.retries)
if install_sig_handlers:
arv_cmd.install_signal_handlers()
# Split storage-classes argument
storage_classes = None
if args.storage_classes:
- storage_classes = args.storage_classes.strip().split(',')
- if len(storage_classes) > 1:
- logger.error("Multiple storage classes are not supported currently.")
- sys.exit(1)
-
+ storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
# Setup exclude regex from all the --exclude arguments provided
name_patterns = []
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
use_cache = args.use_cache,
+ batch_mode= args.batch,
filename = args.filename,
reporter = reporter,
api_client = api_client,
" or been created with another Arvados user's credentials.",
" Switch user or use one of the following options to restart upload:",
" --no-resume to start a new resume cache.",
- " --no-cache to disable resume cache."]))
+ " --no-cache to disable resume cache.",
+ " --batch to ignore the resume cache if invalid."]))
sys.exit(1)
except (CollectionUpdateError, PathDoesNotExistError) as error:
logger.error("\n".join([
output = None
try:
writer.start(save_collection=not(args.stream or args.raw))
- except arvados.errors.ApiError as error:
+ except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
output = writer.manifest_text()
elif args.raw:
output = ','.join(writer.data_locators())
- else:
+ elif writer.manifest_locator() is not None:
try:
expiration_notice = ""
if writer.collection_trash_at() is not None:
"arv-put: Error creating Collection on project: {}.".format(
error))
status = 1
+ else:
+ status = 1
# Print the locator (uuid) of the new collection.
if output is None: