package main import ( "container/list" "context" "testing" "time" ) type TrashWorkerTestData struct { Locator1 string Block1 []byte BlockMtime1 int64 Locator2 string Block2 []byte BlockMtime2 int64 CreateData bool CreateInVolume1 bool UseTrashLifeTime bool DifferentMtimes bool DeleteLocator string ExpectLocator1 bool ExpectLocator2 bool } /* Delete block that does not exist in any of the keep volumes. Expect no errors. */ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: "5d41402abc4b2a76b9719d911017c592", Block1: []byte("hello"), Locator2: "5d41402abc4b2a76b9719d911017c592", Block2: []byte("hello"), CreateData: false, DeleteLocator: "5d41402abc4b2a76b9719d911017c592", ExpectLocator1: false, ExpectLocator2: false, } performTrashWorkerTest(testData, t) } /* Delete a block that exists on volume 1 of the keep servers. Expect the second locator in volume 2 to be unaffected. */ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash2, Block2: TestBlock2, CreateData: true, DeleteLocator: TestHash, // first locator ExpectLocator1: false, ExpectLocator2: true, } performTrashWorkerTest(testData, t) } /* Delete a block that exists on volume 2 of the keep servers. Expect the first locator in volume 1 to be unaffected. */ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash2, Block2: TestBlock2, CreateData: true, DeleteLocator: TestHash2, // locator 2 ExpectLocator1: true, ExpectLocator2: false, } performTrashWorkerTest(testData, t) } /* Delete a block with matching mtime for locator in both volumes. Expect locator to be deleted from both volumes. */ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash, Block2: TestBlock, CreateData: true, DeleteLocator: TestHash, ExpectLocator1: false, ExpectLocator2: false, } performTrashWorkerTest(testData, t) } /* Same locator with different Mtimes exists in both volumes. Delete the second and expect the first to be still around. */ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash, Block2: TestBlock, CreateData: true, DifferentMtimes: true, DeleteLocator: TestHash, ExpectLocator1: true, ExpectLocator2: false, } performTrashWorkerTest(testData, t) } /* Two different locators in volume 1. Delete one of them. Expect the other unaffected. */ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash2, Block2: TestBlock2, CreateData: true, CreateInVolume1: true, DeleteLocator: TestHash, // locator 1 ExpectLocator1: false, ExpectLocator2: true, } performTrashWorkerTest(testData, t) } /* Allow default Trash Life time to be used. Thus, the newly created block will not be deleted because its Mtime is within the trash life time. */ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) { theConfig.EnableDelete = true testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash2, Block2: TestBlock2, CreateData: true, CreateInVolume1: true, UseTrashLifeTime: true, DeleteLocator: TestHash, // locator 1 // Since trash life time is in effect, block won't be deleted. ExpectLocator1: true, ExpectLocator2: true, } performTrashWorkerTest(testData, t) } /* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false, so block won't be deleted. */ func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) { theConfig.EnableDelete = false testData := TrashWorkerTestData{ Locator1: TestHash, Block1: TestBlock, Locator2: TestHash, Block2: TestBlock, CreateData: true, DeleteLocator: TestHash, ExpectLocator1: true, ExpectLocator2: true, } performTrashWorkerTest(testData, t) } /* Perform the test */ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) { // Create Keep Volumes KeepVM = MakeTestVolumeManager(2) defer KeepVM.Close() // Put test content vols := KeepVM.AllWritable() if testData.CreateData { vols[0].Put(context.TODO(), testData.Locator1, testData.Block1) vols[0].Put(context.TODO(), testData.Locator1+".meta", []byte("metadata")) if testData.CreateInVolume1 { vols[0].Put(context.TODO(), testData.Locator2, testData.Block2) vols[0].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata")) } else { vols[1].Put(context.TODO(), testData.Locator2, testData.Block2) vols[1].Put(context.TODO(), testData.Locator2+".meta", []byte("metadata")) } } oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute) // Create TrashRequest for the test trashRequest := TrashRequest{ Locator: testData.DeleteLocator, BlockMtime: oldBlockTime.UnixNano(), } // Run trash worker and put the trashRequest on trashq trashList := list.New() trashList.PushBack(trashRequest) trashq = NewWorkQueue() defer trashq.Close() if !testData.UseTrashLifeTime { // Trash worker would not delete block if its Mtime is // within trash life time. Back-date the block to // allow the deletion to succeed. for _, v := range vols { v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime if testData.DifferentMtimes { oldBlockTime = oldBlockTime.Add(time.Second) } } } go RunTrashWorker(trashq) // Install gate so all local operations block until we say go gate := make(chan struct{}) for _, v := range vols { v.(*MockVolume).Gate = gate } assertStatusItem := func(k string, expect float64) { if v := getStatusItem("TrashQueue", k); v != expect { t.Errorf("Got %s %v, expected %v", k, v, expect) } } assertStatusItem("InProgress", 0) assertStatusItem("Queued", 0) listLen := trashList.Len() trashq.ReplaceQueue(trashList) // Wait for worker to take request(s) expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress }) // Ensure status.json also reports work is happening assertStatusItem("InProgress", float64(1)) assertStatusItem("Queued", float64(listLen-1)) // Let worker proceed close(gate) // Wait for worker to finish expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) // Verify Locator1 to be un/deleted as expected buf := make([]byte, BlockSize) size, err := GetBlock(context.TODO(), testData.Locator1, buf, nil) if testData.ExpectLocator1 { if size == 0 || err != nil { t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1) } } else { if size > 0 || err == nil { t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1) } } // Verify Locator2 to be un/deleted as expected if testData.Locator1 != testData.Locator2 { size, err = GetBlock(context.TODO(), testData.Locator2, buf, nil) if testData.ExpectLocator2 { if size == 0 || err != nil { t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2) } } else { if size > 0 || err == nil { t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2) } } } // The DifferentMtimes test puts the same locator in two // different volumes, but only one copy has an Mtime matching // the trash request. if testData.DifferentMtimes { locatorFoundIn := 0 for _, volume := range KeepVM.AllReadable() { buf := make([]byte, BlockSize) if _, err := volume.Get(context.TODO(), testData.Locator1, buf); err == nil { locatorFoundIn = locatorFoundIn + 1 } } if locatorFoundIn != 1 { t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1) } } }