//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"container/list"
"context"
- "testing"
"time"
+
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
+ "github.com/prometheus/client_golang/prometheus"
+ check "gopkg.in/check.v1"
)
type TrashWorkerTestData struct {
/* Delete block that does not exist in any of the keep volumes.
Expect no errors.
*/
-func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: "5d41402abc4b2a76b9719d911017c592",
Block1: []byte("hello"),
ExpectLocator1: false,
ExpectLocator2: false,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Delete a block that exists on volume 1 of the keep servers.
Expect the second locator in volume 2 to be unaffected.
*/
-func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: false,
ExpectLocator2: true,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Delete a block that exists on volume 2 of the keep servers.
Expect the first locator in volume 1 to be unaffected.
*/
-func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: true,
ExpectLocator2: false,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Delete a block with matching mtime for locator in both volumes.
Expect locator to be deleted from both volumes.
*/
-func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: false,
ExpectLocator2: false,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Same locator with different Mtimes exists in both volumes.
Delete the second and expect the first to be still around.
*/
-func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: true,
ExpectLocator2: false,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
// Delete a block that exists on both volumes with matching mtimes,
// but specify a MountUUID in the request so it only gets deleted from
// the first volume.
-func TestTrashWorkerIntegration_SpecifyMountUUID(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: true,
ExpectLocator2: true,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Two different locators in volume 1.
Delete one of them.
Expect the other unaffected.
*/
-func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: false,
ExpectLocator2: true,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Allow default Trash Life time to be used. Thus, the newly created block
will not be deleted because its Mtime is within the trash life time.
*/
-func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
- theConfig.EnableDelete = true
+func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
+ s.cluster.Collections.BlobTrash = true
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: true,
ExpectLocator2: true,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
so block won't be deleted.
*/
-func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
- theConfig.EnableDelete = false
+func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
+ s.cluster.Collections.BlobTrash = false
testData := TrashWorkerTestData{
Locator1: TestHash,
Block1: TestBlock,
ExpectLocator1: true,
ExpectLocator2: true,
}
- performTrashWorkerTest(testData, t)
+ s.performTrashWorkerTest(c, testData)
}
/* Perform the test */
-func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
- // Create Keep Volumes
- KeepVM = MakeTestVolumeManager(2)
- defer KeepVM.Close()
+func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
+ c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
+ // Replace the router's trashq -- which the worker goroutines
+ // started by setup() are now receiving from -- with a new
+ // one, so we can see what the handler sends to it.
+ trashq := NewWorkQueue()
+ s.handler.Handler.(*router).trashq = trashq
// Put test content
- vols := KeepVM.AllWritable()
+ mounts := s.handler.volmgr.AllWritable()
if testData.CreateData {
- vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
- vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
+ mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
+ mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
if testData.CreateInVolume1 {
- vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
- vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
+ mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
+ mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
} else {
- vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
- vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
+ mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
+ mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
}
}
- oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
+ oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)
// Create TrashRequest for the test
trashRequest := TrashRequest{
BlockMtime: oldBlockTime.UnixNano(),
}
if testData.SpecifyMountUUID {
- trashRequest.MountUUID = KeepVM.Mounts()[0].UUID
+ trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
}
// Run trash worker and put the trashRequest on trashq
trashList := list.New()
trashList.PushBack(trashRequest)
- trashq = NewWorkQueue()
- defer trashq.Close()
if !testData.UseTrashLifeTime {
// Trash worker would not delete block if its Mtime is
// within trash life time. Back-date the block to
// allow the deletion to succeed.
- for _, v := range vols {
- v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
+ for _, mnt := range mounts {
+ mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
if testData.DifferentMtimes {
oldBlockTime = oldBlockTime.Add(time.Second)
}
}
}
- go RunTrashWorker(trashq)
+ go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
// Install gate so all local operations block until we say go
gate := make(chan struct{})
- for _, v := range vols {
- v.(*MockVolume).Gate = gate
+ for _, mnt := range mounts {
+ mnt.Volume.(*MockVolume).Gate = gate
}
assertStatusItem := func(k string, expect float64) {
- if v := getStatusItem("TrashQueue", k); v != expect {
- t.Errorf("Got %s %v, expected %v", k, v, expect)
+ if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
+ c.Errorf("Got %s %v, expected %v", k, v, expect)
}
}
trashq.ReplaceQueue(trashList)
// Wait for worker to take request(s)
- expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+ expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
// Ensure status.json also reports work is happening
assertStatusItem("InProgress", float64(1))
close(gate)
// Wait for worker to finish
- expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
+ expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
// Verify Locator1 to be un/deleted as expected
buf := make([]byte, BlockSize)
- size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
+ size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
if testData.ExpectLocator1 {
if size == 0 || err != nil {
- t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
+ c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
}
} else {
if size > 0 || err == nil {
- t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
+ c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
}
}
// Verify Locator2 to be un/deleted as expected
if testData.Locator1 != testData.Locator2 {
- size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
+ size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
if testData.ExpectLocator2 {
if size == 0 || err != nil {
- t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
+ c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
}
} else {
if size > 0 || err == nil {
- t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
+ c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
}
}
}
// the trash request.
if testData.DifferentMtimes {
locatorFoundIn := 0
- for _, volume := range KeepVM.AllReadable() {
+ for _, volume := range s.handler.volmgr.AllReadable() {
buf := make([]byte, BlockSize)
if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
locatorFoundIn = locatorFoundIn + 1
}
}
- if locatorFoundIn != 1 {
- t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
- }
+ c.Check(locatorFoundIn, check.Equals, 1)
}
}