import (
"container/list"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "log"
+ "context"
"testing"
"time"
)
Block2 []byte
BlockMtime2 int64
- CreateData bool
- CreateInVolume1 bool
- UseDelayToCreate bool
+ CreateData bool
+ CreateInVolume1 bool
- UseDefaultTrashTime bool
+ UseTrashLifeTime bool
+ DifferentMtimes bool
DeleteLocator string
Expect no errors.
*/
func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
Expect the second locator in volume 2 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH, // first locator
+ DeleteLocator: TestHash, // first locator
ExpectLocator1: false,
ExpectLocator2: true,
Expect the first locator in volume 1 to be unaffected.
*/
func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
- DeleteLocator: TEST_HASH_2, // locator 2
+ DeleteLocator: TestHash2, // locator 2
ExpectLocator1: true,
ExpectLocator2: false,
Expect locator to be deleted from both volumes.
*/
func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
CreateData: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: false,
ExpectLocator2: false,
Delete the second and expect the first to be still around.
*/
func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH,
- Block2: TEST_BLOCK,
+ Locator2: TestHash,
+ Block2: TestBlock,
- CreateData: true,
- UseDelayToCreate: true,
+ CreateData: true,
+ DifferentMtimes: true,
- DeleteLocator: TEST_HASH,
+ DeleteLocator: TestHash,
ExpectLocator1: true,
ExpectLocator2: false,
Expect the other unaffected.
*/
func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
- DeleteLocator: TEST_HASH, // locator 1
+ DeleteLocator: TestHash, // locator 1
ExpectLocator1: false,
ExpectLocator2: true,
performTrashWorkerTest(testData, t)
}
-/* Allow defaultTrashLifetime to be used. Thus, the newly created block
- will not be deleted becuase its Mtime is within the trash life time.
+/* Allow default Trash Life time to be used. Thus, the newly created block
+ will not be deleted because its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
+ theConfig.EnableDelete = true
testData := TrashWorkerTestData{
- Locator1: TEST_HASH,
- Block1: TEST_BLOCK,
+ Locator1: TestHash,
+ Block1: TestBlock,
- Locator2: TEST_HASH_2,
- Block2: TEST_BLOCK_2,
+ Locator2: TestHash2,
+ Block2: TestBlock2,
CreateData: true,
CreateInVolume1: true,
- UseDefaultTrashTime: true,
+ UseTrashLifeTime: true,
+
+ DeleteLocator: TestHash, // locator 1
+
+ // Since trash life time is in effect, block won't be deleted.
+ ExpectLocator1: true,
+ ExpectLocator2: true,
+ }
+ performTrashWorkerTest(testData, t)
+}
+
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
+ so block won't be deleted.
+*/
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+ theConfig.EnableDelete = false
+ testData := TrashWorkerTestData{
+ Locator1: TestHash,
+ Block1: TestBlock,
+
+ Locator2: TestHash,
+ Block2: TestBlock,
- DeleteLocator: TEST_HASH, // locator 1
+ CreateData: true,
+
+ DeleteLocator: TestHash,
- // Since defaultTrashLifetime is in effect, block won't be deleted.
ExpectLocator1: true,
ExpectLocator2: true,
}
func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
// Create Keep Volumes
KeepVM = MakeTestVolumeManager(2)
-
- // Delete from volume will not take place if the block MTime is within permission_ttl
- permission_ttl = time.Duration(1) * time.Second
+ defer KeepVM.Close()
// Put test content
- vols := KeepVM.Volumes()
+ vols := KeepVM.AllWritable()
if testData.CreateData {
- vols[0].Put(testData.Locator1, testData.Block1)
- vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
-
- // One of the tests deletes a locator with different Mtimes in two different volumes
- if testData.UseDelayToCreate {
- time.Sleep(1 * time.Second)
- }
+ vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
+ vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
if testData.CreateInVolume1 {
- vols[0].Put(testData.Locator2, testData.Block2)
- vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
} else {
- vols[1].Put(testData.Locator2, testData.Block2)
- vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+ vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
+ vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
}
}
+ oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
+
// Create TrashRequest for the test
trashRequest := TrashRequest{
Locator: testData.DeleteLocator,
- BlockMtime: time.Now().Unix(),
+ BlockMtime: oldBlockTime.UnixNano(),
}
- // delay by permission_ttl to allow deletes to work
- time.Sleep(1 * time.Second)
-
// Run trash worker and put the trashRequest on trashq
trashList := list.New()
trashList.PushBack(trashRequest)
trashq = NewWorkQueue()
+ defer trashq.Close()
+
+ if !testData.UseTrashLifeTime {
+ // Trash worker would not delete block if its Mtime is
+ // within trash life time. Back-date the block to
+ // allow the deletion to succeed.
+ for _, v := range vols {
+ v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
+ if testData.DifferentMtimes {
+ oldBlockTime = oldBlockTime.Add(time.Second)
+ }
+ }
+ }
+ go RunTrashWorker(trashq)
- // Trash worker would not delete block if its Mtime is within defaultTrashLifetime
- // Hence, we will have to bypass it to allow the deletion to succeed.
- if !testData.UseDefaultTrashTime {
- go RunTrashWorker(nil, trashq)
- } else {
- arv, err := arvadosclient.MakeArvadosClient()
- if err != nil {
- log.Fatalf("Error setting up arvados client %s", err.Error())
+ // Install gate so all local operations block until we say go
+ gate := make(chan struct{})
+ for _, v := range vols {
+ v.(*MockVolume).Gate = gate
+ }
+
+ assertStatusItem := func(k string, expect float64) {
+ if v := getStatusItem("TrashQueue", k); v != expect {
+ t.Errorf("Got %s %v, expected %v", k, v, expect)
}
- go RunTrashWorker(&arv, trashq)
}
+ assertStatusItem("InProgress", 0)
+ assertStatusItem("Queued", 0)
+
+ listLen := trashList.Len()
trashq.ReplaceQueue(trashList)
- time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+ // Wait for worker to take request(s)
+ expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+
+ // Ensure status.json also reports work is happening
+ assertStatusItem("InProgress", float64(1))
+ assertStatusItem("Queued", float64(listLen-1))
+
+ // Let worker proceed
+ close(gate)
+
+ // Wait for worker to finish
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
- data, _ := GetBlock(testData.Locator1, false)
+ buf := make([]byte, BlockSize)
+ size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
}
}
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- data, _ = GetBlock(testData.Locator2, false)
+ size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
- if len(data) == 0 {
+ if size == 0 || err != nil {
t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
}
} else {
- if len(data) > 0 {
+ if size > 0 || err == nil {
t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
}
}
}
- // One test used the same locator in two different volumes but with different Mtime values
- // Hence let's verify that only one volume has it and the other is deleted
- if (testData.ExpectLocator1) &&
- (testData.Locator1 == testData.Locator2) {
+ // The DifferentMtimes test puts the same locator in two
+ // different volumes, but only one copy has an Mtime matching
+ // the trash request.
+ if testData.DifferentMtimes {
locatorFoundIn := 0
- for _, volume := range KeepVM.Volumes() {
- if _, err := volume.Get(testData.Locator1); err == nil {
+ for _, volume := range KeepVM.AllReadable() {
+ buf := make([]byte, BlockSize)
+ if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
if locatorFoundIn != 1 {
- t.Errorf("Expected locator to be found in only one volume after deleting. But found: %s", locatorFoundIn)
+ t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
}
}
-
- // Done
- trashq.Close()
- KeepVM.Quit()
}