Merge branch 'master' into 14930-arvput-trash-at
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 10 Jun 2019 23:15:23 +0000 (20:15 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 10 Jun 2019 23:15:23 +0000 (20:15 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index afe75b31056c5c7d872256875a74265012de3b91..5773cb4f98792354c671a3cfb3ecb90f7f92f0f9 100644 (file)
@@ -10,6 +10,7 @@ import argparse
 import arvados
 import arvados.collection
 import base64
+import ciso8601
 import copy
 import datetime
 import errno
@@ -234,6 +235,19 @@ _group.add_argument('--no-cache', action='store_false', dest='use_cache',
 Do not save upload state in a cache file for resuming.
 """)
 
+_group = upload_opts.add_mutually_exclusive_group()
+_group.add_argument('--trash-at', metavar='YYYY-MM-DDTHH:MM', default=None,
+                    help="""
+Set the trash date of the resulting collection to an absolute date in the future.
+The accepted format is defined by the ISO 8601 standard. Examples: 20090103, 2009-01-03, 20090103T181505, 2009-01-03T18:15:05.\n
+Timezone information can be added. If not, the provided date/time is assumed as being in the local system's timezone.
+""")
+_group.add_argument('--trash-after', type=int, metavar='DAYS', default=None,
+                    help="""
+Set the trash date of the resulting collection to an amount of days from the
+date/time that the upload process finishes.
+""")
+
 arg_parser = argparse.ArgumentParser(
     description='Copy data from the local filesystem to Keep.',
     parents=[upload_opts, run_opts, arv_cmd.retry_opt])
@@ -430,7 +444,8 @@ class ArvPutUploadJob(object):
                  put_threads=None, replication_desired=None, filename=None,
                  update_time=60.0, update_collection=None, storage_classes=None,
                  logger=logging.getLogger('arvados.arv_put'), dry_run=False,
-                 follow_links=True, exclude_paths=[], exclude_names=None):
+                 follow_links=True, exclude_paths=[], exclude_names=None,
+                 trash_at=None):
         self.paths = paths
         self.resume = resume
         self.use_cache = use_cache
@@ -470,6 +485,13 @@ class ArvPutUploadJob(object):
         self.follow_links = follow_links
         self.exclude_paths = exclude_paths
         self.exclude_names = exclude_names
+        self._trash_at = trash_at
+
+        if self._trash_at is not None:
+            if type(self._trash_at) not in [datetime.datetime, datetime.timedelta]:
+                raise TypeError('trash_at should be None, timezone-naive datetime or timedelta')
+            if type(self._trash_at) == datetime.datetime and self._trash_at.tzinfo is not None:
+                raise TypeError('provided trash_at datetime should be timezone-naive')
 
         if not self.use_cache and self.resume:
             raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
@@ -610,6 +632,17 @@ class ArvPutUploadJob(object):
             if self.use_cache:
                 self._cache_file.close()
 
+    def _collection_trash_at(self):
+        """
+        Returns the trash date that the collection should use at save time.
+        Takes into account absolute/relative trash_at values requested
+        by the user.
+        """
+        if type(self._trash_at) == datetime.timedelta:
+            # Get an absolute datetime for trash_at
+            return datetime.datetime.utcnow() + self._trash_at
+        return self._trash_at
+
     def save_collection(self):
         if self.update:
             # Check if files should be updated on the remote collection.
@@ -625,7 +658,8 @@ class ArvPutUploadJob(object):
                     # The file already exist on remote collection, skip it.
                     pass
             self._remote_collection.save(storage_classes=self.storage_classes,
-                                         num_retries=self.num_retries)
+                                         num_retries=self.num_retries,
+                                         trash_at=self._collection_trash_at())
         else:
             if self.storage_classes is None:
                 self.storage_classes = ['default']
@@ -633,7 +667,8 @@ class ArvPutUploadJob(object):
                 name=self.name, owner_uuid=self.owner_uuid,
                 storage_classes=self.storage_classes,
                 ensure_unique_name=self.ensure_unique_name,
-                num_retries=self.num_retries)
+                num_retries=self.num_retries,
+                trash_at=self._collection_trash_at())
 
     def destroy_cache(self):
         if self.use_cache:
@@ -688,6 +723,15 @@ class ArvPutUploadJob(object):
                     self._save_state()
                 except Exception as e:
                     self.logger.error("Unexpected error trying to save cache file: {}".format(e))
+            # Keep remote collection's trash_at attribute synced when using relative expire dates
+            if self._remote_collection is not None and type(self._trash_at) == datetime.timedelta:
+                try:
+                    self._api_client.collections().update(
+                        uuid=self._remote_collection.manifest_locator(),
+                        body={'trash_at': self._collection_trash_at().strftime("%Y-%m-%dT%H:%M:%S.%fZ")}
+                    ).execute(num_retries=self.num_retries)
+                except Exception as e:
+                    self.logger.error("Unexpected error trying to update remote collection's expire date: {}".format(e))
         else:
             self.bytes_written = self.bytes_skipped
         # Call the reporter, if any
@@ -958,6 +1002,9 @@ class ArvPutUploadJob(object):
     def collection_name(self):
         return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
 
+    def collection_trash_at(self):
+        return self._my_collection().get_trash_at()
+
     def manifest_locator(self):
         return self._my_collection().manifest_locator()
 
@@ -1073,6 +1120,44 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     if install_sig_handlers:
         arv_cmd.install_signal_handlers()
 
+    # Trash arguments validation
+    trash_at = None
+    if args.trash_at is not None:
+        # ciso8601 considers YYYYMM as invalid but YYYY-MM as valid, so here we
+        # make sure the user provides a complete YYYY-MM-DD date.
+        if not re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}', args.trash_at):
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        # Check if no time information was provided. In that case, assume end-of-day.
+        if re.match(r'^\d{4}(?P<dash>-?)\d{2}?(?P=dash)\d{2}$', args.trash_at):
+            args.trash_at += 'T23:59:59'
+        try:
+            trash_at = ciso8601.parse_datetime(args.trash_at)
+        except:
+            logger.error("--trash-at argument format invalid, use --help to see examples.")
+            sys.exit(1)
+        else:
+            if trash_at.tzinfo is not None:
+                # Timezone aware datetime provided.
+                utcoffset = -trash_at.utcoffset()
+            else:
+                # Timezone naive datetime provided. Assume is local.
+                if time.daylight:
+                    utcoffset = datetime.timedelta(seconds=time.altzone)
+                else:
+                    utcoffset = datetime.timedelta(seconds=time.timezone)
+            # Convert to UTC timezone naive datetime.
+            trash_at = trash_at.replace(tzinfo=None) + utcoffset
+
+        if trash_at <= datetime.datetime.utcnow():
+            logger.error("--trash-at argument must be set in the future")
+            sys.exit(1)
+    if args.trash_after is not None:
+        if args.trash_after < 1:
+            logger.error("--trash-after argument must be >= 1")
+            sys.exit(1)
+        trash_at = datetime.timedelta(seconds=(args.trash_after * 24 * 60 * 60))
+
     # Determine the name to use
     if args.name:
         if args.stream or args.raw:
@@ -1178,7 +1263,8 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
                                  dry_run=args.dry_run,
                                  follow_links=args.follow_links,
                                  exclude_paths=exclude_paths,
-                                 exclude_names=exclude_names)
+                                 exclude_names=exclude_names,
+                                 trash_at=trash_at)
     except ResumeCacheConflict:
         logger.error("\n".join([
             "arv-put: Another process is already uploading this data.",
@@ -1192,7 +1278,7 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
             "         --no-resume to start a new resume cache.",
             "         --no-cache to disable resume cache."]))
         sys.exit(1)
-    except CollectionUpdateError as error:
+    except (CollectionUpdateError, PathDoesNotExistError) as error:
         logger.error("\n".join([
             "arv-put: %s" % str(error)]))
         sys.exit(1)
@@ -1202,10 +1288,6 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
     except ArvPutUploadNotPending:
         # No files pending for upload
         sys.exit(0)
-    except PathDoesNotExistError as error:
-        logger.error("\n".join([
-            "arv-put: %s" % str(error)]))
-        sys.exit(1)
 
     if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
         logger.warning("\n".join([
@@ -1234,10 +1316,21 @@ def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
         output = ','.join(writer.data_locators())
     else:
         try:
+            expiration_notice = ""
+            if writer.collection_trash_at() is not None:
+                # Get the local timezone-naive version, and log it with timezone information.
+                if time.daylight:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.altzone)
+                else:
+                    local_trash_at = writer.collection_trash_at().replace(tzinfo=None) - datetime.timedelta(seconds=time.timezone)
+                expiration_notice = ". It will expire on {} {}.".format(
+                    local_trash_at.strftime("%Y-%m-%d %H:%M:%S"), time.strftime("%z"))
             if args.update_collection:
-                logger.info(u"Collection updated: '{}'".format(writer.collection_name()))
+                logger.info(u"Collection updated: '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             else:
-                logger.info(u"Collection saved as '{}'".format(writer.collection_name()))
+                logger.info(u"Collection saved as '{}'{}".format(
+                    writer.collection_name(), expiration_notice))
             if args.portable_data_hash:
                 output = writer.portable_data_hash()
             else:
index 540e06c6c6a0d571e7a269e5eae7c9e8a1989419..42adf2450d2d5a16cd499f342c3f2f243eefdb75 100644 (file)
@@ -12,6 +12,7 @@ from builtins import str
 from builtins import range
 from functools import partial
 import apiclient
+import ciso8601
 import datetime
 import hashlib
 import json
@@ -212,7 +213,7 @@ class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
 
     def test_cache_is_locked(self):
         with tempfile.NamedTemporaryFile() as cachefile:
-            cache = arv_put.ResumeCache(cachefile.name)
+            _ = arv_put.ResumeCache(cachefile.name)
             self.assertRaises(arv_put.ResumeCacheConflict,
                               arv_put.ResumeCache, cachefile.name)
 
@@ -311,7 +312,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
     def test_passing_nonexistant_path_raise_exception(self):
         uuid_str = str(uuid.uuid4())
         with self.assertRaises(arv_put.PathDoesNotExistError):
-            cwriter = arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
+            arv_put.ArvPutUploadJob(["/this/path/does/not/exist/{}".format(uuid_str)])
 
     def test_writer_works_without_cache(self):
         cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
@@ -843,7 +844,7 @@ class ArvadosPutTest(run_test_server.TestCaseWithServers,
             fake_httplib2_response(403), b'{}')
         with mock.patch('arvados.collection.Collection.save_new',
                         new=coll_save_mock):
-            with self.assertRaises(SystemExit) as exc_test:
+            with self.assertRaises(SystemExit):
                 self.call_main_with_args(['/dev/null'])
             self.assertRegex(
                 self.main_stderr.getvalue(), matcher)
@@ -918,7 +919,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
         self.authorize_with('active')
         with self.assertRaises(apiclient.errors.HttpError):
-            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
+            arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
                                                   0)
 
     def test_short_put_from_stdin(self):
@@ -979,7 +980,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         # we're about to create is not present in our test fixture.
         manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
         with self.assertRaises(apiclient.errors.HttpError):
-            notfound = arv_put.api_client.collections().get(
+            arv_put.api_client.collections().get(
                 uuid=manifest_uuid).execute()
 
         datadir = self.make_tmpdir()
@@ -990,7 +991,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(err.decode(), r'INFO: Collection saved as ')
         self.assertEqual(p.returncode, 0)
 
@@ -1030,7 +1031,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
         self.assertEqual(p.returncode, 0)
         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
@@ -1053,7 +1054,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(
             err.decode(),
             r'INFO: Cache expired, starting from scratch.*')
@@ -1069,7 +1070,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
         self.assertEqual(p.returncode, 0)
         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
@@ -1091,7 +1092,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(
             err.decode(),
             r'ERROR: arv-put: Resume cache contains invalid signature.*')
@@ -1111,7 +1112,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(err.decode(), r'INFO: Creating new cache file at ')
         self.assertEqual(p.returncode, 0)
         cache_filepath = re.search(r'INFO: Creating new cache file at (.*)',
@@ -1135,7 +1136,7 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              env=self.ENVIRON)
-        (out, err) = p.communicate()
+        (_, err) = p.communicate()
         self.assertRegex(
             err.decode(),
             r'WARNING: Uploaded file \'.*barfile.txt\' access token expired, will re-upload it from scratch')
@@ -1160,6 +1161,107 @@ class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
         c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
         self.assertRegex(c['manifest_text'], r'^\..* .*:44:file2\n')
 
+    def test_put_collection_with_utc_expiring_datetime(self):
+        tmpdir = self.make_tmpdir()
+        trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%MZ')
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection(
+            "",
+            ['--no-progress', '--trash-at', trash_at, tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        self.assertEqual(ciso8601.parse_datetime(trash_at),
+            ciso8601.parse_datetime(c['trash_at']))
+
+    def test_put_collection_with_timezone_aware_expiring_datetime(self):
+        tmpdir = self.make_tmpdir()
+        trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M-0300')
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection(
+            "",
+            ['--no-progress', '--trash-at', trash_at, tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        self.assertEqual(
+            ciso8601.parse_datetime(trash_at).replace(tzinfo=None) + datetime.timedelta(hours=3),
+            ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+    def test_put_collection_with_timezone_naive_expiring_datetime(self):
+        tmpdir = self.make_tmpdir()
+        trash_at = (datetime.datetime.utcnow() + datetime.timedelta(days=90)).strftime('%Y%m%dT%H%M')
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection(
+            "",
+            ['--no-progress', '--trash-at', trash_at, tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        if time.daylight:
+            offset = datetime.timedelta(seconds=time.altzone)
+        else:
+            offset = datetime.timedelta(seconds=time.timezone)
+        self.assertEqual(
+            ciso8601.parse_datetime(trash_at) + offset,
+            ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+    def test_put_collection_with_expiring_date_only(self):
+        tmpdir = self.make_tmpdir()
+        trash_at = '2140-01-01'
+        end_of_day = datetime.timedelta(hours=23, minutes=59, seconds=59)
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection(
+            "",
+            ['--no-progress', '--trash-at', trash_at, tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        if time.daylight:
+            offset = datetime.timedelta(seconds=time.altzone)
+        else:
+            offset = datetime.timedelta(seconds=time.timezone)
+        self.assertEqual(
+            ciso8601.parse_datetime(trash_at) + end_of_day + offset,
+            ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None))
+
+    def test_put_collection_with_invalid_absolute_expiring_datetimes(self):
+        cases = ['2100', '210010','2100-10', '2100-Oct']
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        for test_datetime in cases:
+            with self.assertRaises(AssertionError):
+                self.run_and_find_collection(
+                    "",
+                    ['--no-progress', '--trash-at', test_datetime, tmpdir])
+
+    def test_put_collection_with_relative_expiring_datetime(self):
+        expire_after = 7
+        dt_before = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        col = self.run_and_find_collection(
+            "",
+            ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+        self.assertNotEqual(None, col['uuid'])
+        dt_after = datetime.datetime.utcnow() + datetime.timedelta(days=expire_after)
+        c = arv_put.api_client.collections().get(uuid=col['uuid']).execute()
+        trash_at = ciso8601.parse_datetime(c['trash_at']).replace(tzinfo=None)
+        self.assertTrue(dt_before < trash_at)
+        self.assertTrue(dt_after > trash_at)
+
+    def test_put_collection_with_invalid_relative_expiring_datetime(self):
+        expire_after = 0 # Must be >= 1
+        tmpdir = self.make_tmpdir()
+        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
+            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
+        with self.assertRaises(AssertionError):
+            self.run_and_find_collection(
+                "",
+                ['--no-progress', '--trash-after', str(expire_after), tmpdir])
+
     def test_upload_directory_reference_without_trailing_slash(self):
         tmpdir1 = self.make_tmpdir()
         tmpdir2 = self.make_tmpdir()