import arvados
import arvados.collection
import base64
+import ciso8601
import copy
import datetime
import errno
Do not save upload state in a cache file for resuming.
""")
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+ help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+ help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
arg_parser = argparse.ArgumentParser(
description='Copy data from the local filesystem to Keep.',
parents=[upload_opts, run_opts, arv_cmd.retry_opt])
put_threads=None, replication_desired=None, filename=None,
update_time=60.0, update_collection=None, storage_classes=None,
logger=logging.getLogger('arvados.arv_put'), dry_run=False,
- follow_links=True, exclude_paths=[], exclude_names=None):
+ follow_links=True, exclude_paths=[], exclude_names=None,
+ trash_at=None):
self.paths = paths
self.resume = resume
self.use_cache = use_cache
self.follow_links = follow_links
self.exclude_paths = exclude_paths
self.exclude_names = exclude_names
+ self._trash_at = trash_at
+
+ if self._trash_at is not None:
+ if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+ raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+ if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+ raise TypeError('provided trash_at datetime should be timezone-naive')
if not self.use_cache and self.resume:
raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
if self.use_cache:
self._cache_file.close()
+ def _collection_trash_at(self):
+ """
+ Returns the trash date that the collection should use at save time.
+ Takes into account absolute/relative trash_at values requested
+ by the user.
+ """
+ if type(self._trash_at) == datetime.timedelta:
+ # Get an absolute datetime for trash_at
+ return datetime.datetime.utcnow() + self._trash_at
+ return self._trash_at
+
def save_collection(self):
if self.update:
# Check if files should be updated on the remote collection.
# The file already exist on remote collection, skip it.
pass
self._remote_collection.save(storage_classes=self.storage_classes,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ trash_at=self._collection_trash_at())
else:
if self.storage_classes is None:
self.storage_classes = ['default']
name=self.name, owner_uuid=self.owner_uuid,
storage_classes=self.storage_classes,
ensure_unique_name=self.ensure_unique_name,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ trash_at=self._collection_trash_at())
def destroy_cache(self):
if self.use_cache:
self._save_state()
except Exception as e:
self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+ # Keep remote collection's trash_at attribute synced when using relative expire dates
+ if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+ try:
+ self._api_client.collections().update(
+ uuid=self._remote_collection.manifest_locator(),
+ body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+ ).execute(num_retries=self.num_retries)
+ except Exception as e:
+ self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
else:
self.bytes_written = self.bytes_skipped
# Call the reporter, if any
def collection_name(self):
return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
+ def collection_trash_at(self):
+ return self._my_collection().get_trash_at()
+
def manifest_locator(self):
return self._my_collection().manifest_locator()
if install_sig_handlers:
arv_cmd.install_signal_handlers()
+ # Trash arguments validation
+ trash_at = None
+ if args.trash_at is not None:
+ # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+ # make sure the user provides a complete YYYY-MM-DD date.
+ if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+ logger.error("--trash-at argument format invalid, use --help to see examples.")
+ sys.exit(1)
+ # Check if no time information was provided. In that case, assume end-of-day.
+ if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+ args.trash_at += 'T23:59:59'
+ try:
+ trash_at = ciso8601.parse_datetime(args.trash_at)
+ except:
+ logger.error("--trash-at argument format invalid, use --help to see examples.")
+ sys.exit(1)
+ else:
+ if trash_at.tzinfo is not None:
+ # Timezone aware datetime provided.
+ utcoffset = -trash_at.utcoffset()
+ else:
+ # Timezone naive datetime provided. Assume is local.
+ if time.daylight:
+ utcoffset = datetime.timedelta(seconds=time.altzone)
+ else:
+ utcoffset = datetime.timedelta(seconds=time.timezone)
+ # Convert to UTC timezone naive datetime.
+ trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+ if trash_at <= datetime.datetime.utcnow():
+ logger.error("--trash-at argument must be set in the future")
+ sys.exit(1)
+ if args.trash_after is not None:
+ if args.trash_after < 1:
+ logger.error("--trash-after argument must be >= 1")
+ sys.exit(1)
+ trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
+
# Determine the name to use
if args.name:
if args.stream or args.raw:
dry_run=args.dry_run,
follow_links=args.follow_links,
exclude_paths=exclude_paths,
- exclude_names=exclude_names)
+ exclude_names=exclude_names,
+ trash_at=trash_at)
except ResumeCacheConflict:
logger.error("\n".join([
"arv-put: Another process is already uploading this data.",
" --no-resume to start a new resume cache.",
" --no-cache to disable resume cache."]))
sys.exit(1)
- except CollectionUpdateError as error:
+ except (CollectionUpdateError, PathDoesNotExistError) as error:
logger.error("\n".join([
"arv-put: %s" % str(error)]))
sys.exit(1)
except ArvPutUploadNotPending:
# No files pending for upload
sys.exit(0)
- except PathDoesNotExistError as error:
- logger.error("\n".join([
- "arv-put: %s" % str(error)]))
- sys.exit(1)
if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
logger.warning("\n".join([
output = ','.join(writer.data_locators())
else:
try:
+ expiration_notice = ""
+ if writer.collection_trash_at() is not None:
+ # Get the local timezone-naive version, and log it with timezone information.
+ if time.daylight:
+ local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+ else:
+ local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+ expiration_notice = ". It will expire on {} {}.".format(
+ local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
if args.update_collection:
- logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
+ logger.info(u"Collection updated: '{}'{}".format(
+ writer.collection_name(), expiration_notice))
else:
- logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
+ logger.info(u"Collection saved as '{}'{}".format(
+ writer.collection_name(), expiration_notice))
if args.portable_data_hash:
output = writer.portable_data_hash()
else:
from builtins import range
from functools import partial
import apiclient
+import ciso8601
import datetime
import hashlib
import json
def test_cache_is_locked(self):
with tempfile.NamedTemporaryFile() as cachefile:
- cache = arv_put.ResumeCache(cachefile.name)
+ _ = arv_put.ResumeCache(cachefile.name)
self.assertRaises(arv_put.ResumeCacheConflict,
arv_put.ResumeCache, cachefile.name)
def test_passing_nonexistant_path_raise_exception(self):
uuid_str = str(uuid.uuid4())
with self.assertRaises(arv_put.PathDoesNotExistError):
- cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+ arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
def test_writer_works_without_cache(self):
cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
fake_httplib2_response(403), b'{}')
with mock.patch('arvados.collection.Collection.save_new',
new=coll_save_mock):
- with self.assertRaises(SystemExit) as exc_test:
+ with self.assertRaises(SystemExit):
self.call_main_with_args(['/dev/null'])
self.assertRegex(
self.main_stderr.getvalue(), matcher)
BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
self.authorize_with('active')
with self.assertRaises(apiclient.errors.HttpError):
- result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+ arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
0)
def test_short_put_from_stdin(self):
# we're about to create is not present in our test fixture.
manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
with self.assertRaises(apiclient.errors.HttpError):
- notfound = arv_put.api_client.collections().get(
+ arv_put.api_client.collections().get(
uuid=manifest_uuid).execute()
datadir = self.make_tmpdir()
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Collection saved as ')
self.assertEqual(p.returncode, 0)
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
self.assertEqual(p.returncode, 0)
cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(
err.decode(),
r'INFO: Cache expired, starting from scratch.*')
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
self.assertEqual(p.returncode, 0)
cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(
err.decode(),
r'ERROR: arv-put: Resume cache contains invalid signature.*')
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
self.assertEqual(p.returncode, 0)
cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=self.ENVIRON)
- (out, err) = p.communicate()
+ (_, err) = p.communicate()
self.assertRegex(
err.decode(),
r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
+ def test_put_collection_with_utc_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(ciso8601.parse_datetime(trash_at),
+ ciso8601.parse_datetime(c['trash_at']))
+
+ def test_put_collection_with_timezone_aware_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_timezone_naive_expiring_datetime(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_expiring_date_only(self):
+ tmpdir = self.make_tmpdir()
+ trash_at = '2140-01-01'
+ end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', trash_at, tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ if time.daylight:
+ offset = datetime.timedelta(seconds=time.altzone)
+ else:
+ offset = datetime.timedelta(seconds=time.timezone)
+ self.assertEqual(
+ ciso8601.parse_datetime(trash_at) + end_of_day + offset,
+ ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+ def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
+ cases = ['2100', '210010','2100-10', '2100-Oct']
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ for test_datetime in cases:
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-at', test_datetime, tmpdir])
+
+ def test_put_collection_with_relative_expiring_datetime(self):
+ expire_after = 7
+ dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ col = self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+ self.assertNotEqual(None, col['uuid'])
+ dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+ c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+ trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
+ self.assertTrue(dt_before < trash_at)
+ self.assertTrue(dt_after > trash_at)
+
+ def test_put_collection_with_invalid_relative_expiring_datetime(self):
+ expire_after = 0 # Must be >= 1
+ tmpdir = self.make_tmpdir()
+ with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+ f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+ with self.assertRaises(AssertionError):
+ self.run_and_find_collection(
+ "",
+ ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+
def test_upload_directory_reference_without_trailing_slash(self):
tmpdir1 = self.make_tmpdir()
tmpdir2 = self.make_tmpdir()