+ del(self.writer)
+
+ # Test for bug #11002
+ def test_graceful_exit_while_repacking_small_blocks(self):
+ def wrapped_commit(*args, **kwargs):
+ raise SystemExit("Simulated error")
+
+ with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
+ autospec=True) as mocked_commit:
+ mocked_commit.side_effect = wrapped_commit
+ # Upload a little more than 1 block, wrapped_commit will make the first block
+ # commit to fail.
+ # arv-put should not exit with an exception by trying to commit the collection
+ # as it's in an inconsistent state.
+ writer = arv_put.ArvPutUploadJob([self.small_files_dir],
+ replication_desired=1)
+ try:
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ except arvados.arvfile.UnownedBlockError:
+ self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
+ writer.destroy_cache()
+
+ def test_no_resume_when_asked(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without resume
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+ del(self.writer)
+
+ def test_no_resume_when_no_cache(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ # Retry the upload, this time without cache usage
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ resume=False,
+ use_cache=False)
+ writer2.start(save_collection=False)
+ self.assertEqual(writer2.bytes_skipped, 0)
+ self.assertEqual(writer2.bytes_written,
+ os.path.getsize(self.large_file_name))
+ writer2.destroy_cache()
+ del(self.writer)
+
+ def test_dry_run_feature(self):
+ def wrapped_write(*args, **kwargs):
+ data = args[1]
+ # Exit only on last block
+ if len(data) < arvados.config.KEEP_BLOCK_SIZE:
+ # Simulate a checkpoint before quitting.
+ self.writer._update()
+ raise SystemExit("Simulated error")
+ return self.arvfile_write(*args, **kwargs)
+
+ with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+ autospec=True) as mocked_write:
+ mocked_write.side_effect = wrapped_write
+ writer = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ # We'll be accessing from inside the wrapper
+ self.writer = writer
+ with self.assertRaises(SystemExit):
+ writer.start(save_collection=False)
+ # Confirm that the file was partially uploaded
+ self.assertGreater(writer.bytes_written, 0)
+ self.assertLess(writer.bytes_written,
+ os.path.getsize(self.large_file_name))
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ # Retry the upload using dry_run to check if there is a pending upload
+ writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ # Complete the pending upload
+ writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1)
+ writer3.start(save_collection=False)
+ with self.assertRaises(arv_put.ArvPutUploadNotPending):
+ # Confirm there's no pending upload with dry_run=True
+ writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True)
+ # Test obvious cases
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False,
+ use_cache=False)
+ with self.assertRaises(arv_put.ArvPutUploadIsPending):
+ arv_put.ArvPutUploadJob([self.large_file_name],
+ replication_desired=1,
+ dry_run=True,
+ resume=False)
+ del(self.writer)
+
+class CachedManifestValidationTest(ArvadosBaseTestCase):
+ class MockedPut(arv_put.ArvPutUploadJob):
+ def __init__(self, cached_manifest=None):
+ self._state = arv_put.ArvPutUploadJob.EMPTY_STATE
+ self._state['manifest'] = cached_manifest
+ self._api_client = mock.MagicMock()
+ self.logger = mock.MagicMock()
+ self.num_retries = 1
+
+ def datetime_to_hex(self, dt):
+ return hex(int(time.mktime(dt.timetuple())))[2:]
+
+ def setUp(self):
+ super(CachedManifestValidationTest, self).setUp()
+ self.block1 = "fdba98970961edb29f88241b9d99d890" # foo
+ self.block2 = "37b51d194a7513e45b56f6524f2d51f2" # bar
+ self.template = ". "+self.block1+"+3+Asignature@%s "+self.block2+"+3+Anothersignature@%s 0:3:foofile.txt 3:6:barfile.txt\n"
+
+ def test_empty_cached_manifest_is_valid(self):
+ put_mock = self.MockedPut()
+ self.assertEqual(None, put_mock._state.get('manifest'))
+ self.assertTrue(put_mock._cached_manifest_valid())
+ put_mock._state['manifest'] = ''
+ self.assertTrue(put_mock._cached_manifest_valid())
+
+ def test_signature_cases(self):
+ now = datetime.datetime.utcnow()
+ yesterday = now - datetime.timedelta(days=1)
+ lastweek = now - datetime.timedelta(days=7)
+ tomorrow = now + datetime.timedelta(days=1)
+ nextweek = now + datetime.timedelta(days=7)
+
+ def mocked_head(blocks={}, loc=None):
+ blk = loc.split('+', 1)[0]
+ if blocks.get(blk):
+ return True
+ raise arvados.errors.KeepRequestError("mocked error - block invalid")
+
+ # Block1_expiration, Block2_expiration, Block1_HEAD, Block2_HEAD, Expectation
+ cases = [
+ # All expired, reset cache - OK
+ (yesterday, lastweek, False, False, True),
+ (lastweek, yesterday, False, False, True),
+ # All non-expired valid blocks - OK
+ (tomorrow, nextweek, True, True, True),
+ (nextweek, tomorrow, True, True, True),
+ # All non-expired invalid blocks - Not OK
+ (tomorrow, nextweek, False, False, False),
+ (nextweek, tomorrow, False, False, False),
+ # One non-expired valid block - OK
+ (tomorrow, yesterday, True, False, True),
+ (yesterday, tomorrow, False, True, True),
+ # One non-expired invalid block - Not OK
+ (tomorrow, yesterday, False, False, False),
+ (yesterday, tomorrow, False, False, False),
+ ]
+ for case in cases:
+ b1_expiration, b2_expiration, b1_valid, b2_valid, outcome = case
+ head_responses = {
+ self.block1: b1_valid,
+ self.block2: b2_valid,
+ }
+ cached_manifest = self.template % (
+ self.datetime_to_hex(b1_expiration),
+ self.datetime_to_hex(b2_expiration),
+ )
+ arvput = self.MockedPut(cached_manifest)
+ with mock.patch('arvados.collection.KeepClient.head') as head_mock:
+ head_mock.side_effect = partial(mocked_head, head_responses)
+ self.assertEqual(outcome, arvput._cached_manifest_valid(),
+ "Case '%s' should have produced outcome '%s'" % (case, outcome)
+ )
+ if b1_expiration > now or b2_expiration > now:
+ # A HEAD request should have been done
+ head_mock.assert_called_once()
+ else:
+ head_mock.assert_not_called()