X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0f5b0542513b572959e39400bae42e69aeb1a7b6..387d86217ab0f119285c12735a6d0f3e606c23a1:/services/keepstore/trash_worker_test.go diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go index a1648c52cc..0c304dbade 100644 --- a/services/keepstore/trash_worker_test.go +++ b/services/keepstore/trash_worker_test.go @@ -5,355 +5,198 @@ package keepstore import ( - "container/list" "context" + "crypto/md5" + "encoding/json" + "fmt" + "net/http" + "sort" "time" - "git.arvados.org/arvados.git/sdk/go/ctxlog" - "github.com/prometheus/client_golang/prometheus" - check "gopkg.in/check.v1" + "git.arvados.org/arvados.git/sdk/go/arvados" + . "gopkg.in/check.v1" ) -type TrashWorkerTestData struct { - Locator1 string - Block1 []byte - BlockMtime1 int64 - - Locator2 string - Block2 []byte - BlockMtime2 int64 - - CreateData bool - CreateInVolume1 bool - - UseTrashLifeTime bool - DifferentMtimes bool - - DeleteLocator string - SpecifyMountUUID bool - - ExpectLocator1 bool - ExpectLocator2 bool -} - -// Delete block that does not exist in any of the keep volumes. -// Expect no errors. -func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: "5d41402abc4b2a76b9719d911017c592", - Block1: []byte("hello"), - - Locator2: "5d41402abc4b2a76b9719d911017c592", - Block2: []byte("hello"), - - CreateData: false, - - DeleteLocator: "5d41402abc4b2a76b9719d911017c592", - - ExpectLocator1: false, - ExpectLocator2: false, - } - s.performTrashWorkerTest(c, testData) -} - -// Delete a block that exists on volume 1 of the keep servers. Expect -// the second locator in volume 2 to be unaffected. -func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash2, - Block2: TestBlock2, - - CreateData: true, - - DeleteLocator: TestHash, // first locator - - ExpectLocator1: false, - ExpectLocator2: true, - } - s.performTrashWorkerTest(c, testData) -} - -// Delete a block that exists on volume 2 of the keep servers. Expect -// the first locator in volume 1 to be unaffected. -func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash2, - Block2: TestBlock2, - - CreateData: true, - - DeleteLocator: TestHash2, // locator 2 - - ExpectLocator1: true, - ExpectLocator2: false, - } - s.performTrashWorkerTest(c, testData) -} - -// Delete a block with matching mtime for locator in both -// volumes. Expect locator to be deleted from both volumes. -func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash, - Block2: TestBlock, - - CreateData: true, - - DeleteLocator: TestHash, - - ExpectLocator1: false, - ExpectLocator2: false, - } - s.performTrashWorkerTest(c, testData) -} - -// Same locator with different Mtimes exists in both volumes. Delete -// the second and expect the first to be still around. -func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash, - Block2: TestBlock, - - CreateData: true, - DifferentMtimes: true, - - DeleteLocator: TestHash, - - ExpectLocator1: true, - ExpectLocator2: false, - } - s.performTrashWorkerTest(c, testData) -} - -// Delete a block that exists on both volumes with matching mtimes, -// but specify a MountUUID in the request so it only gets deleted from -// the first volume. -func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash, - Block2: TestBlock, - - CreateData: true, - - DeleteLocator: TestHash, - SpecifyMountUUID: true, - - ExpectLocator1: true, - ExpectLocator2: true, - } - s.performTrashWorkerTest(c, testData) -} - -// Two different locators in volume 1. Delete one of them. Expect the -// other unaffected. -func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash2, - Block2: TestBlock2, - - CreateData: true, - CreateInVolume1: true, - - DeleteLocator: TestHash, // locator 1 - - ExpectLocator1: false, - ExpectLocator2: true, - } - s.performTrashWorkerTest(c, testData) -} - -// Allow default Trash Life time to be used. Thus, the newly created -// block will not be deleted because its Mtime is within the trash -// life time. -func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) { - s.cluster.Collections.BlobTrash = true - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash2, - Block2: TestBlock2, - - CreateData: true, - CreateInVolume1: true, - - UseTrashLifeTime: true, - - DeleteLocator: TestHash, // locator 1 - - // Since trash life time is in effect, block won't be deleted. - ExpectLocator1: true, - ExpectLocator2: true, - } - s.performTrashWorkerTest(c, testData) -} - -// Delete a block with matching mtime for locator in both volumes, but -// EnableDelete is false, so block won't be deleted. -func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) { +func (s *routerSuite) TestTrashList_Clear(c *C) { s.cluster.Collections.BlobTrash = false - testData := TrashWorkerTestData{ - Locator1: TestHash, - Block1: TestBlock, - - Locator2: TestHash, - Block2: TestBlock, - - CreateData: true, - - DeleteLocator: TestHash, - - ExpectLocator1: true, - ExpectLocator2: true, - } - s.performTrashWorkerTest(c, testData) + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() + + resp := call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte(` + [ + { + "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3", + "block_mtime":1707249451308502672, + "mount_uuid":"zzzzz-nyw5e-000000000000000" + } + ] + `), nil) + c.Check(resp.Code, Equals, http.StatusOK) + c.Check(router.trasher.todo, DeepEquals, []TrashListItem{{ + Locator: "acbd18db4cc2f85cedef654fccc4a4d8+3", + BlockMtime: 1707249451308502672, + MountUUID: "zzzzz-nyw5e-000000000000000", + }}) + + resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte("[]"), nil) + c.Check(resp.Code, Equals, http.StatusOK) + c.Check(router.trasher.todo, HasLen, 0) } -func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) { - c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil) - // Replace the router's trashq -- which the worker goroutines - // started by setup() are now receiving from -- with a new - // one, so we can see what the handler sends to it. - trashq := NewWorkQueue() - s.handler.Handler.(*router).trashq = trashq - - // Put test content - mounts := s.handler.volmgr.AllWritable() - if testData.CreateData { - mounts[0].Put(context.Background(), testData.Locator1, testData.Block1) - mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata")) - - if testData.CreateInVolume1 { - mounts[0].Put(context.Background(), testData.Locator2, testData.Block2) - mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata")) - } else { - mounts[1].Put(context.Background(), testData.Locator2, testData.Block2) - mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata")) - } - } - - oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute) - - // Create TrashRequest for the test - trashRequest := TrashRequest{ - Locator: testData.DeleteLocator, - BlockMtime: oldBlockTime.UnixNano(), - } - if testData.SpecifyMountUUID { - trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID - } - - // Run trash worker and put the trashRequest on trashq - trashList := list.New() - trashList.PushBack(trashRequest) - - if !testData.UseTrashLifeTime { - // Trash worker would not delete block if its Mtime is - // within trash life time. Back-date the block to - // allow the deletion to succeed. - for _, mnt := range mounts { - mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime - if testData.DifferentMtimes { - oldBlockTime = oldBlockTime.Add(time.Second) +func (s *routerSuite) TestTrashList_Execute(c *C) { + s.cluster.Collections.BlobTrashConcurrency = 1 + s.cluster.Volumes = map[string]arvados.Volume{ + "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"}, + "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true}, + "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true}, + } + router, cancel := testRouter(c, s.cluster, nil) + defer cancel() + + var mounts []struct { + UUID string + DeviceID string `json:"device_id"` + } + resp := call(router, "GET", "http://example/mounts", s.cluster.SystemRootToken, nil, nil) + c.Check(resp.Code, Equals, http.StatusOK) + err := json.Unmarshal(resp.Body.Bytes(), &mounts) + c.Assert(err, IsNil) + c.Assert(mounts, HasLen, 4) + + // Sort mounts by UUID + sort.Slice(mounts, func(i, j int) bool { + return mounts[i].UUID < mounts[j].UUID + }) + + // Make vols (stub volumes) in same order as mounts + var vols []*stubVolume + for _, mount := range mounts { + vols = append(vols, router.keepstore.mounts[mount.UUID].volume.(*stubVolume)) + } + + // The "trial" loop below will construct the trashList which + // we'll send to trasher via router, plus a slice of checks + // which we'll run after the trasher has finished executing + // the list. + var trashList []TrashListItem + var checks []func() + + tNew := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() / 2) + tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second) + + for _, trial := range []struct { + comment string + storeMtime []time.Time + trashListItems []TrashListItem + expectData []bool + }{ + { + comment: "timestamp matches, but is not old enough to trash => skip", + storeMtime: []time.Time{tNew}, + trashListItems: []TrashListItem{ + { + BlockMtime: tNew.UnixNano(), + MountUUID: mounts[0].UUID, + }, + }, + expectData: []bool{true}, + }, + { + comment: "timestamp matches, and is old enough => trash", + storeMtime: []time.Time{tOld}, + trashListItems: []TrashListItem{ + { + BlockMtime: tOld.UnixNano(), + MountUUID: mounts[0].UUID, + }, + }, + expectData: []bool{false}, + }, + { + comment: "timestamp matches and is old enough on mount 0, but the request specifies mount 1, where timestamp does not match => skip", + storeMtime: []time.Time{tOld, tOld.Add(-time.Second)}, + trashListItems: []TrashListItem{ + { + BlockMtime: tOld.UnixNano(), + MountUUID: mounts[1].UUID, + }, + }, + expectData: []bool{true, true}, + }, + { + comment: "MountUUID unspecified => trash from any mount where timestamp matches, leave alone elsewhere", + storeMtime: []time.Time{tOld, tOld.Add(-time.Second)}, + trashListItems: []TrashListItem{ + { + BlockMtime: tOld.UnixNano(), + }, + }, + expectData: []bool{false, true}, + }, + { + comment: "MountUUID unspecified => trash from multiple mounts if timestamp matches, but skip readonly volumes unless AllowTrashWhenReadOnly", + storeMtime: []time.Time{tOld, tOld, tOld, tOld}, + trashListItems: []TrashListItem{ + { + BlockMtime: tOld.UnixNano(), + }, + }, + expectData: []bool{false, false, true, false}, + }, + { + comment: "readonly MountUUID specified => skip", + storeMtime: []time.Time{tOld, tOld, tOld}, + trashListItems: []TrashListItem{ + { + BlockMtime: tOld.UnixNano(), + MountUUID: mounts[2].UUID, + }, + }, + expectData: []bool{true, true, true}, + }, + } { + trial := trial + data := []byte(fmt.Sprintf("trial %+v", trial)) + hash := fmt.Sprintf("%x", md5.Sum(data)) + for i, t := range trial.storeMtime { + if t.IsZero() { + continue } + err := vols[i].BlockWrite(context.Background(), hash, data) + c.Assert(err, IsNil) + err = vols[i].blockTouchWithTime(hash, t) + c.Assert(err, IsNil) } - } - go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq) - - // Install gate so all local operations block until we say go - gate := make(chan struct{}) - for _, mnt := range mounts { - mnt.Volume.(*MockVolume).Gate = gate - } - - assertStatusItem := func(k string, expect float64) { - if v := getStatusItem(s.handler, "TrashQueue", k); v != expect { - c.Errorf("Got %s %v, expected %v", k, v, expect) - } - } - - assertStatusItem("InProgress", 0) - assertStatusItem("Queued", 0) - - listLen := trashList.Len() - trashq.ReplaceQueue(trashList) - - // Wait for worker to take request(s) - expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress }) - - // Ensure status.json also reports work is happening - assertStatusItem("InProgress", float64(1)) - assertStatusItem("Queued", float64(listLen-1)) - - // Let worker proceed - close(gate) - - // Wait for worker to finish - expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress }) - - // Verify Locator1 to be un/deleted as expected - buf := make([]byte, BlockSize) - size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil) - if testData.ExpectLocator1 { - if size == 0 || err != nil { - c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1) + for _, item := range trial.trashListItems { + item.Locator = fmt.Sprintf("%s+%d", hash, len(data)) + trashList = append(trashList, item) } - } else { - if size > 0 || err == nil { - c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1) + for i, expect := range trial.expectData { + i, expect := i, expect + checks = append(checks, func() { + ent := vols[i].data[hash] + dataPresent := ent.data != nil && ent.trash.IsZero() + c.Check(dataPresent, Equals, expect, Commentf("%s mount %d (%s) expect present=%v but got len(ent.data)=%d ent.trash=%v // %s\nlog:\n%s", hash, i, vols[i].params.UUID, expect, len(ent.data), !ent.trash.IsZero(), trial.comment, vols[i].stubLog.String())) + }) } } - // Verify Locator2 to be un/deleted as expected - if testData.Locator1 != testData.Locator2 { - size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil) - if testData.ExpectLocator2 { - if size == 0 || err != nil { - c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2) - } - } else { - if size > 0 || err == nil { - c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2) - } + listjson, err := json.Marshal(trashList) + resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, listjson, nil) + c.Check(resp.Code, Equals, http.StatusOK) + + for { + router.trasher.cond.L.Lock() + todolen := len(router.trasher.todo) + router.trasher.cond.L.Unlock() + if todolen == 0 && router.trasher.inprogress.Load() == 0 { + break } + time.Sleep(time.Millisecond) } - // The DifferentMtimes test puts the same locator in two - // different volumes, but only one copy has an Mtime matching - // the trash request. - if testData.DifferentMtimes { - locatorFoundIn := 0 - for _, volume := range s.handler.volmgr.AllReadable() { - buf := make([]byte, BlockSize) - if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil { - locatorFoundIn = locatorFoundIn + 1 - } - } - c.Check(locatorFoundIn, check.Equals, 1) + for _, check := range checks { + check() } }