2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / trash_worker_test.go
index a1648c52cc9312b65339a348a94c306a9d5c1c29..0c304dbadec5498d8f736bb83cfeab88cbda6de4 100644 (file)
 package keepstore
 
 import (
-       "container/list"
        "context"
+       "crypto/md5"
+       "encoding/json"
+       "fmt"
+       "net/http"
+       "sort"
        "time"
 
-       "git.arvados.org/arvados.git/sdk/go/ctxlog"
-       "github.com/prometheus/client_golang/prometheus"
-       check "gopkg.in/check.v1"
+       "git.arvados.org/arvados.git/sdk/go/arvados"
+       . "gopkg.in/check.v1"
 )
 
-type TrashWorkerTestData struct {
-       Locator1    string
-       Block1      []byte
-       BlockMtime1 int64
-
-       Locator2    string
-       Block2      []byte
-       BlockMtime2 int64
-
-       CreateData      bool
-       CreateInVolume1 bool
-
-       UseTrashLifeTime bool
-       DifferentMtimes  bool
-
-       DeleteLocator    string
-       SpecifyMountUUID bool
-
-       ExpectLocator1 bool
-       ExpectLocator2 bool
-}
-
-// Delete block that does not exist in any of the keep volumes.
-// Expect no errors.
-func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: "5d41402abc4b2a76b9719d911017c592",
-               Block1:   []byte("hello"),
-
-               Locator2: "5d41402abc4b2a76b9719d911017c592",
-               Block2:   []byte("hello"),
-
-               CreateData: false,
-
-               DeleteLocator: "5d41402abc4b2a76b9719d911017c592",
-
-               ExpectLocator1: false,
-               ExpectLocator2: false,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Delete a block that exists on volume 1 of the keep servers. Expect
-// the second locator in volume 2 to be unaffected.
-func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash2,
-               Block2:   TestBlock2,
-
-               CreateData: true,
-
-               DeleteLocator: TestHash, // first locator
-
-               ExpectLocator1: false,
-               ExpectLocator2: true,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Delete a block that exists on volume 2 of the keep servers. Expect
-// the first locator in volume 1 to be unaffected.
-func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash2,
-               Block2:   TestBlock2,
-
-               CreateData: true,
-
-               DeleteLocator: TestHash2, // locator 2
-
-               ExpectLocator1: true,
-               ExpectLocator2: false,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Delete a block with matching mtime for locator in both
-// volumes. Expect locator to be deleted from both volumes.
-func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash,
-               Block2:   TestBlock,
-
-               CreateData: true,
-
-               DeleteLocator: TestHash,
-
-               ExpectLocator1: false,
-               ExpectLocator2: false,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Same locator with different Mtimes exists in both volumes. Delete
-// the second and expect the first to be still around.
-func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash,
-               Block2:   TestBlock,
-
-               CreateData:      true,
-               DifferentMtimes: true,
-
-               DeleteLocator: TestHash,
-
-               ExpectLocator1: true,
-               ExpectLocator2: false,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Delete a block that exists on both volumes with matching mtimes,
-// but specify a MountUUID in the request so it only gets deleted from
-// the first volume.
-func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash,
-               Block2:   TestBlock,
-
-               CreateData: true,
-
-               DeleteLocator:    TestHash,
-               SpecifyMountUUID: true,
-
-               ExpectLocator1: true,
-               ExpectLocator2: true,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Two different locators in volume 1. Delete one of them. Expect the
-// other unaffected.
-func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash2,
-               Block2:   TestBlock2,
-
-               CreateData:      true,
-               CreateInVolume1: true,
-
-               DeleteLocator: TestHash, // locator 1
-
-               ExpectLocator1: false,
-               ExpectLocator2: true,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Allow default Trash Life time to be used. Thus, the newly created
-// block will not be deleted because its Mtime is within the trash
-// life time.
-func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
-       s.cluster.Collections.BlobTrash = true
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash2,
-               Block2:   TestBlock2,
-
-               CreateData:      true,
-               CreateInVolume1: true,
-
-               UseTrashLifeTime: true,
-
-               DeleteLocator: TestHash, // locator 1
-
-               // Since trash life time is in effect, block won't be deleted.
-               ExpectLocator1: true,
-               ExpectLocator2: true,
-       }
-       s.performTrashWorkerTest(c, testData)
-}
-
-// Delete a block with matching mtime for locator in both volumes, but
-// EnableDelete is false, so block won't be deleted.
-func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
+func (s *routerSuite) TestTrashList_Clear(c *C) {
        s.cluster.Collections.BlobTrash = false
-       testData := TrashWorkerTestData{
-               Locator1: TestHash,
-               Block1:   TestBlock,
-
-               Locator2: TestHash,
-               Block2:   TestBlock,
-
-               CreateData: true,
-
-               DeleteLocator: TestHash,
-
-               ExpectLocator1: true,
-               ExpectLocator2: true,
-       }
-       s.performTrashWorkerTest(c, testData)
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       resp := call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte(`
+               [
+                {
+                 "locator":"acbd18db4cc2f85cedef654fccc4a4d8+3",
+                 "block_mtime":1707249451308502672,
+                 "mount_uuid":"zzzzz-nyw5e-000000000000000"
+                }
+               ]
+               `), nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(router.trasher.todo, DeepEquals, []TrashListItem{{
+               Locator:    "acbd18db4cc2f85cedef654fccc4a4d8+3",
+               BlockMtime: 1707249451308502672,
+               MountUUID:  "zzzzz-nyw5e-000000000000000",
+       }})
+
+       resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, []byte("[]"), nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       c.Check(router.trasher.todo, HasLen, 0)
 }
 
-func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
-       c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
-       // Replace the router's trashq -- which the worker goroutines
-       // started by setup() are now receiving from -- with a new
-       // one, so we can see what the handler sends to it.
-       trashq := NewWorkQueue()
-       s.handler.Handler.(*router).trashq = trashq
-
-       // Put test content
-       mounts := s.handler.volmgr.AllWritable()
-       if testData.CreateData {
-               mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
-               mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
-
-               if testData.CreateInVolume1 {
-                       mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
-                       mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
-               } else {
-                       mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
-                       mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
-               }
-       }
-
-       oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)
-
-       // Create TrashRequest for the test
-       trashRequest := TrashRequest{
-               Locator:    testData.DeleteLocator,
-               BlockMtime: oldBlockTime.UnixNano(),
-       }
-       if testData.SpecifyMountUUID {
-               trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
-       }
-
-       // Run trash worker and put the trashRequest on trashq
-       trashList := list.New()
-       trashList.PushBack(trashRequest)
-
-       if !testData.UseTrashLifeTime {
-               // Trash worker would not delete block if its Mtime is
-               // within trash life time. Back-date the block to
-               // allow the deletion to succeed.
-               for _, mnt := range mounts {
-                       mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
-                       if testData.DifferentMtimes {
-                               oldBlockTime = oldBlockTime.Add(time.Second)
+func (s *routerSuite) TestTrashList_Execute(c *C) {
+       s.cluster.Collections.BlobTrashConcurrency = 1
+       s.cluster.Volumes = map[string]arvados.Volume{
+               "zzzzz-nyw5e-000000000000000": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-111111111111111": {Replication: 1, Driver: "stub"},
+               "zzzzz-nyw5e-222222222222222": {Replication: 1, Driver: "stub", ReadOnly: true},
+               "zzzzz-nyw5e-333333333333333": {Replication: 1, Driver: "stub", ReadOnly: true, AllowTrashWhenReadOnly: true},
+       }
+       router, cancel := testRouter(c, s.cluster, nil)
+       defer cancel()
+
+       var mounts []struct {
+               UUID     string
+               DeviceID string `json:"device_id"`
+       }
+       resp := call(router, "GET", "http://example/mounts", s.cluster.SystemRootToken, nil, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+       err := json.Unmarshal(resp.Body.Bytes(), &mounts)
+       c.Assert(err, IsNil)
+       c.Assert(mounts, HasLen, 4)
+
+       // Sort mounts by UUID
+       sort.Slice(mounts, func(i, j int) bool {
+               return mounts[i].UUID < mounts[j].UUID
+       })
+
+       // Make vols (stub volumes) in same order as mounts
+       var vols []*stubVolume
+       for _, mount := range mounts {
+               vols = append(vols, router.keepstore.mounts[mount.UUID].volume.(*stubVolume))
+       }
+
+       // The "trial" loop below will construct the trashList which
+       // we'll send to trasher via router, plus a slice of checks
+       // which we'll run after the trasher has finished executing
+       // the list.
+       var trashList []TrashListItem
+       var checks []func()
+
+       tNew := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() / 2)
+       tOld := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Second)
+
+       for _, trial := range []struct {
+               comment        string
+               storeMtime     []time.Time
+               trashListItems []TrashListItem
+               expectData     []bool
+       }{
+               {
+                       comment:    "timestamp matches, but is not old enough to trash => skip",
+                       storeMtime: []time.Time{tNew},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tNew.UnixNano(),
+                                       MountUUID:  mounts[0].UUID,
+                               },
+                       },
+                       expectData: []bool{true},
+               },
+               {
+                       comment:    "timestamp matches, and is old enough => trash",
+                       storeMtime: []time.Time{tOld},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tOld.UnixNano(),
+                                       MountUUID:  mounts[0].UUID,
+                               },
+                       },
+                       expectData: []bool{false},
+               },
+               {
+                       comment:    "timestamp matches and is old enough on mount 0, but the request specifies mount 1, where timestamp does not match => skip",
+                       storeMtime: []time.Time{tOld, tOld.Add(-time.Second)},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tOld.UnixNano(),
+                                       MountUUID:  mounts[1].UUID,
+                               },
+                       },
+                       expectData: []bool{true, true},
+               },
+               {
+                       comment:    "MountUUID unspecified => trash from any mount where timestamp matches, leave alone elsewhere",
+                       storeMtime: []time.Time{tOld, tOld.Add(-time.Second)},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tOld.UnixNano(),
+                               },
+                       },
+                       expectData: []bool{false, true},
+               },
+               {
+                       comment:    "MountUUID unspecified => trash from multiple mounts if timestamp matches, but skip readonly volumes unless AllowTrashWhenReadOnly",
+                       storeMtime: []time.Time{tOld, tOld, tOld, tOld},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tOld.UnixNano(),
+                               },
+                       },
+                       expectData: []bool{false, false, true, false},
+               },
+               {
+                       comment:    "readonly MountUUID specified => skip",
+                       storeMtime: []time.Time{tOld, tOld, tOld},
+                       trashListItems: []TrashListItem{
+                               {
+                                       BlockMtime: tOld.UnixNano(),
+                                       MountUUID:  mounts[2].UUID,
+                               },
+                       },
+                       expectData: []bool{true, true, true},
+               },
+       } {
+               trial := trial
+               data := []byte(fmt.Sprintf("trial %+v", trial))
+               hash := fmt.Sprintf("%x", md5.Sum(data))
+               for i, t := range trial.storeMtime {
+                       if t.IsZero() {
+                               continue
                        }
+                       err := vols[i].BlockWrite(context.Background(), hash, data)
+                       c.Assert(err, IsNil)
+                       err = vols[i].blockTouchWithTime(hash, t)
+                       c.Assert(err, IsNil)
                }
-       }
-       go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
-
-       // Install gate so all local operations block until we say go
-       gate := make(chan struct{})
-       for _, mnt := range mounts {
-               mnt.Volume.(*MockVolume).Gate = gate
-       }
-
-       assertStatusItem := func(k string, expect float64) {
-               if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
-                       c.Errorf("Got %s %v, expected %v", k, v, expect)
-               }
-       }
-
-       assertStatusItem("InProgress", 0)
-       assertStatusItem("Queued", 0)
-
-       listLen := trashList.Len()
-       trashq.ReplaceQueue(trashList)
-
-       // Wait for worker to take request(s)
-       expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
-
-       // Ensure status.json also reports work is happening
-       assertStatusItem("InProgress", float64(1))
-       assertStatusItem("Queued", float64(listLen-1))
-
-       // Let worker proceed
-       close(gate)
-
-       // Wait for worker to finish
-       expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
-
-       // Verify Locator1 to be un/deleted as expected
-       buf := make([]byte, BlockSize)
-       size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
-       if testData.ExpectLocator1 {
-               if size == 0 || err != nil {
-                       c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
+               for _, item := range trial.trashListItems {
+                       item.Locator = fmt.Sprintf("%s+%d", hash, len(data))
+                       trashList = append(trashList, item)
                }
-       } else {
-               if size > 0 || err == nil {
-                       c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
+               for i, expect := range trial.expectData {
+                       i, expect := i, expect
+                       checks = append(checks, func() {
+                               ent := vols[i].data[hash]
+                               dataPresent := ent.data != nil && ent.trash.IsZero()
+                               c.Check(dataPresent, Equals, expect, Commentf("%s mount %d (%s) expect present=%v but got len(ent.data)=%d ent.trash=%v // %s\nlog:\n%s", hash, i, vols[i].params.UUID, expect, len(ent.data), !ent.trash.IsZero(), trial.comment, vols[i].stubLog.String()))
+                       })
                }
        }
 
-       // Verify Locator2 to be un/deleted as expected
-       if testData.Locator1 != testData.Locator2 {
-               size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
-               if testData.ExpectLocator2 {
-                       if size == 0 || err != nil {
-                               c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
-                       }
-               } else {
-                       if size > 0 || err == nil {
-                               c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
-                       }
+       listjson, err := json.Marshal(trashList)
+       resp = call(router, "PUT", "http://example/trash", s.cluster.SystemRootToken, listjson, nil)
+       c.Check(resp.Code, Equals, http.StatusOK)
+
+       for {
+               router.trasher.cond.L.Lock()
+               todolen := len(router.trasher.todo)
+               router.trasher.cond.L.Unlock()
+               if todolen == 0 && router.trasher.inprogress.Load() == 0 {
+                       break
                }
+               time.Sleep(time.Millisecond)
        }
 
-       // The DifferentMtimes test puts the same locator in two
-       // different volumes, but only one copy has an Mtime matching
-       // the trash request.
-       if testData.DifferentMtimes {
-               locatorFoundIn := 0
-               for _, volume := range s.handler.volmgr.AllReadable() {
-                       buf := make([]byte, BlockSize)
-                       if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
-                               locatorFoundIn = locatorFoundIn + 1
-                       }
-               }
-               c.Check(locatorFoundIn, check.Equals, 1)
+       for _, check := range checks {
+               check()
        }
 }