import arvados
import arvados.collection
import base64
+import ciso8601
import copy
import datetime
import errno
_group.add_argument('--stream', action='store_true',
help="""
Store the file content and display the resulting manifest on
-stdout. Do not write the manifest to Keep or save a Collection object
-in Arvados.
+stdout. Do not save a Collection object in Arvados.
""")
_group.add_argument('--as-manifest', action='store_true', dest='manifest',
overall throughput.
""")
+upload_opts.add_argument('--exclude', metavar='PATTERN', default=[],
+ action='append', help="""
+Exclude files and directories whose names match the given glob pattern. When
+using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
+directory, relative to the provided input dirs will be excluded.
+When using a filename pattern like '*.txt', any text file will be excluded
+no matter where it is placed.
+For the special case of needing to exclude only files or dirs directly below
+the given input directory, you can use a pattern like './exclude_this.gif'.
+You can specify multiple patterns by using this argument more than once.
+""")
+
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--follow-links', action='store_true', default=True,
+ dest='follow_links', help="""
+Follow file and directory symlinks (default).
+""")
+_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
+ help="""
+Ignore file and directory symlinks. Even paths given explicitly on the
+command line will be skipped if they are symlinks.
+""")
+
+
run_opts = argparse.ArgumentParser(add_help=False)
run_opts.add_argument('--project-uuid', metavar='UUID', help="""
Save the collection with the specified name.
""")
-run_opts.add_argument('--exclude', metavar='PATTERN', default=[],
- action='append', help="""
-Exclude files and directories whose names match the given glob pattern. When
-using a path-like pattern like 'subdir/*.txt', all text files inside 'subdir'
-directory, relative to the provided input dirs will be excluded.
-When using a filename pattern like '*.txt', any text file will be excluded
-no matter where is placed.
-For the special case of needing to exclude only files or dirs directly below
-the given input directory, you can use a pattern like './exclude_this.gif'.
-You can specify multiple patterns by using this argument more than once.
-""")
-
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--progress', action='store_true',
help="""
still be displayed.)
""")
+run_opts.add_argument('--batch', action='store_true', default=False,
+ help="""
+Retries with '--no-resume --no-cache' if cached state contains invalid/expired
+block signatures.
+""")
+
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--resume', action='store_true', default=True,
help="""
Do not continue interrupted uploads from cached state.
""")
-_group = run_opts.add_mutually_exclusive_group()
-_group.add_argument('--follow-links', action='store_true', default=True,
- dest='follow_links', help="""
-Follow file and directory symlinks (default).
-""")
-_group.add_argument('--no-follow-links', action='store_false', dest='follow_links',
- help="""
-Do not follow file and directory symlinks.
-""")
-
_group = run_opts.add_mutually_exclusive_group()
_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
help="""
Do not save upload state in a cache file for resuming.
""")
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+ help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+ help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
parents=[upload_opts, run_opts, arv_cmd.retry_opt])
args.paths = ["-" if x == "/dev/stdin" else x for x in args.paths]
- if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
- if args.filename:
- arg_parser.error("""
+ if args.filename and (len(args.paths) != 1 or os.path.isdir(args.paths[0])):
+ arg_parser.error("""
--filename argument cannot be used when storing a directory or
multiple files.
""")
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def load(self):
self.cache_file.seek(0)
}
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
- name=None, owner_uuid=None, api_client=None,
+ name=None, owner_uuid=None, api_client=None, batch_mode=False,
ensure_unique_name=False, num_retries=None,
put_threads=None, replication_desired=None, filename=None,
update_time=60.0, update_collection=None, storage_classes=None,
logger=logging.getLogger('arvados.arv_put'), dry_run=False,
- follow_links=True, exclude_paths=[], exclude_names=None):
+ follow_links=True, exclude_paths=[], exclude_names=None,
+ trash_at=None):
self.paths = paths
self.resume = resume
self.use_cache = use_cache
+ self.batch_mode = batch_mode
self.update = False
self.reporter = reporter
# This will set to 0 before start counting, if no special files are going
self.follow_links = follow_links
self.exclude_paths = exclude_paths
self.exclude_names = exclude_names
+ self._trash_at = trash_at
+
+ if self._trash_at is not None:
+ if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+ raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+ if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+ raise TypeError('provided trash_at datetime should be timezone-naive')
if not self.use_cache and self.resume:
raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
raise ArvPutUploadIsPending()
self._write_stdin(self.filename or 'stdin')
elif not os.path.exists(path):
- raise PathDoesNotExistError("file or directory '{}' does not exist.".format(path))
+ raise PathDoesNotExistError(u"file or directory '{}' does not exist.".format(path))
+ elif (not self.follow_links) and os.path.islink(path):
+ self.logger.warning("Skipping symlink '{}'".format(path))
+ continue
elif os.path.isdir(path):
# Use absolute paths on cache index so CWD doesn't interfere
# with the caching logic.
if self.use_cache:
self._cache_file.close()
+ def _collection_trash_at(self):
+ """
+ Returns the trash date that the collection should use at save time.
+ Takes into account absolute/relative trash_at values requested
+ by the user.
+ """
+ if type(self._trash_at) == datetime.timedelta:
+ # Get an absolute datetime for trash_at
+ return datetime.datetime.utcnow() + self._trash_at
+ return self._trash_at
+
def save_collection(self):
if self.update:
# Check if files should be updated on the remote collection.
else:
# The file already exist on remote collection, skip it.
pass
- self._remote_collection.save(storage_classes=self.storage_classes,
- num_retries=self.num_retries)
+ self._remote_collection.save(num_retries=self.num_retries,
+ trash_at=self._collection_trash_at())
else:
- if self.storage_classes is None:
- self.storage_classes = ['default']
+ if len(self._local_collection) == 0:
+ self.logger.warning("No files were uploaded, skipping collection creation.")
+ return
self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
- storage_classes=self.storage_classes,
ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ trash_at=self._collection_trash_at())
def destroy_cache(self):
if self.use_cache:
self._save_state()
except Exception as e:
self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+ # Keep remote collection's trash_at attribute synced when using relative expire dates
+ if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+ try:
+ self._api_client.collections().update(
+ uuid=self._remote_collection.manifest_locator(),
+ body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+ ).execute(num_retries=self.num_retries)
+ except Exception as e:
+ self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
else:
self.bytes_written = self.bytes_skipped
# Call the reporter, if any
def _write_stdin(self, filename):
output = self._local_collection.open(filename, 'wb')
- self._write(sys.stdin, output)
+ self._write(sys.stdin.buffer, output)
output.close()
def _check_file(self, source, filename):
elif file_in_local_collection.permission_expired():
# Permission token expired, re-upload file. This will change whenever
# we have a API for refreshing tokens.
- self.logger.warning("Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
+ self.logger.warning(u"Uploaded file '{}' access token expired, will re-upload it from scratch".format(filename))
should_upload = True
self._local_collection.remove(filename)
elif cached_file_data['size'] == file_in_local_collection.size():
# Inconsistent cache, re-upload the file
should_upload = True
self._local_collection.remove(filename)
- self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
+ self.logger.warning(u"Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
# Local file differs from cached data, re-upload it.
else:
if file_in_local_collection:
update_collection):
try:
self._remote_collection = arvados.collection.Collection(
- update_collection, api_client=self._api_client)
+ update_collection,
+ api_client=self._api_client,
+ storage_classes_desired=self.storage_classes,
+ num_retries=self.num_retries)
except arvados.errors.ApiError as error:
raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
else:
if self.use_cache:
cache_filepath = self._get_cache_filepath()
if self.resume and os.path.exists(cache_filepath):
- self.logger.info("Resuming upload from cache file {}".format(cache_filepath))
+ self.logger.info(u"Resuming upload from cache file {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'a+')
else:
# --no-resume means start with a empty cache file.
- self.logger.info("Creating new cache file at {}".format(cache_filepath))
+ self.logger.info(u"Creating new cache file at {}".format(cache_filepath))
self._cache_file = open(cache_filepath, 'w+')
self._cache_filename = self._cache_file.name
self._lock_file(self._cache_file)
# No cache file, set empty state
self._state = copy.deepcopy(self.EMPTY_STATE)
if not self._cached_manifest_valid():
- raise ResumeCacheInvalidError()
+ if not self.batch_mode:
+ raise ResumeCacheInvalidError()
+ else:
+ self.logger.info("Invalid signatures on cache file '{}' while being run in 'batch mode' -- continuing anyways.".format(self._cache_file.name))
+ self.use_cache = False # Don't overwrite preexisting cache file.
+ self._state = copy.deepcopy(self.EMPTY_STATE)
# Load the previous manifest so we can check if files were modified remotely.
self._local_collection = arvados.collection.Collection(
self._state['manifest'],
replication_desired=self.replication_desired,
+ storage_classes_desired=self.storage_classes,
put_threads=self.put_threads,
- api_client=self._api_client)
+ api_client=self._api_client,
+ num_retries=self.num_retries)
def _cached_manifest_valid(self):
"""
try:
fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
- raise ResumeCacheConflict("{} locked".format(fileobj.name))
+ raise ResumeCacheConflict(u"{} locked".format(fileobj.name))
def _save_state(self):
"""
def collection_name(self):
return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+ def collection_trash_at(self):
+ return self._my_collection().get_trash_at()
+
def manifest_locator(self):
return self._my_collection().manifest_locator()
if install_sig_handlers:
arv_cmd.install_signal_handlers()
+ # Trash arguments validation
+ trash_at = None
+ if args.trash_at is not None:
+ # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+ # make sure the user provides a complete YYYY-MM-DD date.
+ if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+ logger.error("--trash-at argument format invalid, use --help to see examples.")
+ sys.exit(1)
+ # Check if no time information was provided. In that case, assume end-of-day.
+ if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+ args.trash_at += 'T23:59:59'
+ try:
+ trash_at = ciso8601.parse_datetime(args.trash_at)
+ except:
+ logger.error("--trash-at argument format invalid, use --help to see examples.")
+ sys.exit(1)
+ else:
+ if trash_at.tzinfo is not None:
+ # Timezone aware datetime provided.
+ utcoffset = -trash_at.utcoffset()
+ else:
+ # Timezone naive datetime provided. Assume is local.
+ if time.daylight:
+ utcoffset = datetime.timedelta(seconds=time.altzone)
+ else:
+ utcoffset = datetime.timedelta(seconds=time.timezone)
+ # Convert to UTC timezone naive datetime.
+ trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+ if trash_at <= datetime.datetime.utcnow():
+ logger.error("--trash-at argument must be set in the future")
+ sys.exit(1)
+ if args.trash_after is not None:
+ if args.trash_after < 1:
+ logger.error("--trash-after argument must be >= 1")
+ sys.exit(1)
+ trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
+
# Determine the name to use
if args.name:
if args.stream or args.raw:
# Split storage-classes argument
storage_classes = None
if args.storage_classes:
- storage_classes = args.storage_classes.strip().split(',')
- if len(storage_classes) > 1:
- logger.error("Multiple storage classes are not supported currently.")
- sys.exit(1)
-
+ storage_classes = args.storage_classes.strip().replace(' ', '').split(',')
# Setup exclude regex from all the --exclude arguments provided
name_patterns = []
writer = ArvPutUploadJob(paths = args.paths,
resume = args.resume,
use_cache = args.use_cache,
+ batch_mode= args.batch,
filename = args.filename,
reporter = reporter,
api_client = api_client,
dry_run=args.dry_run,
follow_links=args.follow_links,
exclude_paths=exclude_paths,
- exclude_names=exclude_names)
+ exclude_names=exclude_names,
+ trash_at=trash_at)
except ResumeCacheConflict:
logger.error("\n".join([
"arv-put: Another process is already uploading this data.",
" or been created with another Arvados user's credentials.",
" Switch user or use one of the following options to restart upload:",
" --no-resume to start a new resume cache.",
- " --no-cache to disable resume cache."]))
+ " --no-cache to disable resume cache.",
+ " --batch to ignore the resume cache if invalid."]))
sys.exit(1)
- except CollectionUpdateError as error:
+ except (CollectionUpdateError, PathDoesNotExistError) as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
except ArvPutUploadNotPending:
# No files pending for upload
sys.exit(0)
- except PathDoesNotExistError as error:
- logger.error("\n".join([
- "arv-put: %s" % str(error)]))
- sys.exit(1)
if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
logger.warning("\n".join([
output = None
try:
writer.start(save_collection=not(args.stream or args.raw))
- except arvados.errors.ApiError as error:
+ except (arvados.errors.ApiError, arvados.errors.KeepWriteError) as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
output = writer.manifest_text()
elif args.raw:
output = ','.join(writer.data_locators())
- else:
+ elif writer.manifest_locator() is not None:
try:
+ expiration_notice = ""
+ if writer.collection_trash_at() is not None:
+ # Get the local timezone-naive version, and log it with timezone information.
+ if time.daylight:
+ local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+ else:
+ local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+ expiration_notice = ". It will expire on {} {}.".format(
+ local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
if args.update_collection:
- logger.info("Collection updated: '{}'".format(writer.collection_name()))
+ logger.info(u"Collection updated: '{}'{}".format(
+ writer.collection_name(), expiration_notice))
else:
- logger.info("Collection saved as '{}'".format(writer.collection_name()))
+ logger.info(u"Collection saved as '{}'{}".format(
+ writer.collection_name(), expiration_notice))
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
"arv-put: Error creating Collection on project: {}.".format(
error))
status = 1
+ else:
+ status = 1
# Print the locator (uuid) of the new collection.
if output is None: