Block2 []byte
BlockMtime2 int64
- CreateData bool
- CreateInVolume1 bool
- UseDelayToCreate bool
+ CreateData bool
+ CreateInVolume1 bool
UseTrashLifeTime bool
+ DifferentMtimes bool
DeleteLocator string
Expect no errors.
*/
func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
Expect the second locator in volume 2 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH, // first locator
+ DeleteLocator: TestHash, // first locator
ExpectLocator1: false,
ExpectLocator2: true,
Expect the first locator in volume 1 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH_2, // locator 2
+ DeleteLocator: TestHash2, // locator 2
ExpectLocator1: true,
ExpectLocator2: false,
Expect locator to be deleted from both volumes.
*/
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
CreateData: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: false,
ExpectLocator2: false,
Delete the second and expect the first to be still around.
*/
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
- CreateData: true,
- UseDelayToCreate: true,
+ CreateData: true,
+ DifferentMtimes: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: true,
ExpectLocator2: false,
Expect the other unaffected.
*/
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
- DeleteLocator: TEST_HASH, // locator 1
+ DeleteLocator: TestHash, // locator 1
ExpectLocator1: false,
ExpectLocator2: true,
will not be deleted becuase its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
+ neverDelete = false
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
UseTrashLifeTime: true,
- DeleteLocator: TEST_HASH, // locator 1
+ DeleteLocator: TestHash, // locator 1
// Since trash life time is in effect, block won't be deleted.
ExpectLocator1: true,
performTrashWorkerTest(testData, t)
}
+/* Delete a block with matching mtime for locator in both volumes, but neverDelete is true,
+ so block won't be deleted.
+*/
+func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
+ neverDelete = true
+ testData := TrashWorkerTestData{
+ Locator1: TestHash,
+ Block1: TestBlock,
+
+ Locator2: TestHash,
+ Block2: TestBlock,
+
+ CreateData: true,
+
+ DeleteLocator: TestHash,
+
+ ExpectLocator1: true,
+ ExpectLocator2: true,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
/* Perform the test */
func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
- actual_permission_ttl := permission_ttl
-
// Create Keep Volumes
KeepVM = MakeTestVolumeManager(2)
defer KeepVM.Close()
vols[0].Put(testData.Locator1, testData.Block1)
vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
- // One of the tests deletes a locator with different Mtimes in two different volumes
- if testData.UseDelayToCreate {
- time.Sleep(1 * time.Second)
- }
-
if testData.CreateInVolume1 {
vols[0].Put(testData.Locator2, testData.Block2)
vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
}
}
+ oldBlockTime := time.Now().Add(-blobSignatureTTL - time.Minute)
+
// Create TrashRequest for the test
trashRequest := TrashRequest{
Locator: testData.DeleteLocator,
- BlockMtime: time.Now().Unix(),
+ BlockMtime: oldBlockTime.Unix(),
}
- // delay by permission_ttl to allow deletes to work
- time.Sleep(1 * time.Second)
-
// Run trash worker and put the trashRequest on trashq
trashList := list.New()
trashList.PushBack(trashRequest)
trashq = NewWorkQueue()
+ defer trashq.Close()
- // Trash worker would not delete block if its Mtime is within trash life time.
- // Hence, we will have to bypass it to allow the deletion to succeed.
if !testData.UseTrashLifeTime {
- permission_ttl = time.Duration(1) * time.Second
+ // Trash worker would not delete block if its Mtime is
+ // within trash life time. Back-date the block to
+ // allow the deletion to succeed.
+ for _, v := range vols {
+ v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
+ if testData.DifferentMtimes {
+ oldBlockTime = oldBlockTime.Add(time.Second)
+ }
+ }
}
go RunTrashWorker(trashq)
+ // Install gate so all local operations block until we say go
+ gate := make(chan struct{})
+ for _, v := range vols {
+ v.(*MockVolume).Gate = gate
+ }
+
+ assertStatusItem := func(k string, expect float64) {
+ if v := getStatusItem("TrashQueue", k); v != expect {
+ t.Errorf("Got %s %v, expected %v", k, v, expect)
+ }
+ }
+
+ assertStatusItem("InProgress", 0)
+ assertStatusItem("Queued", 0)
+
+ listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
- time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+ // Wait for worker to take request(s)
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+
+ // Ensure status.json also reports work is happening
+ assertStatusItem("InProgress", float64(1))
+ assertStatusItem("Queued", float64(listLen-1))
+
+ // Let worker proceed
+ close(gate)
+
+ // Wait for worker to finish
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1, false)
+ data, _ := GetBlock(testData.Locator1)
if testData.ExpectLocator1 {
if len(data) == 0 {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2, false)
+ data, _ = GetBlock(testData.Locator2)
if testData.ExpectLocator2 {
if len(data) == 0 {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
}
}
- // One test used the same locator in two different volumes but with different Mtime values
- // Hence let's verify that only one volume has it and the other is deleted
- if (testData.ExpectLocator1) &&
- (testData.Locator1 == testData.Locator2) {
+ // The DifferentMtimes test puts the same locator in two
+ // different volumes, but only one copy has an Mtime matching
+ // the trash request.
+ if testData.DifferentMtimes {
locatorFoundIn := 0
for _, volume := range KeepVM.AllReadable() {
if _, err := volume.Get(testData.Locator1); err == nil {
}
}
if locatorFoundIn != 1 {
- t.Errorf("Expected locator to be found in only one volume after deleting. But found: %s", locatorFoundIn)
+ t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
}
}
-
- // Done
- permission_ttl = actual_permission_ttl
- trashq.Close()
}