Merge branch '8784-dir-listings'
[arvados.git] / services / keepstore / trash_worker_test.go
index 3031c2582d57c1e9afa1a1e3fdd9c1fbed6d8a0d..c5a410b06f05c151bb401c31f9377316c573678a 100644 (file)
@@ -1,7 +1,12 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
 package main
 
 import (
        "container/list"
+       "context"
        "testing"
        "time"
 )
@@ -15,13 +20,14 @@ type TrashWorkerTestData struct {
        Block2      []byte
        BlockMtime2 int64
 
-       CreateData       bool
-       CreateInVolume1  bool
-       UseDelayToCreate bool
+       CreateData      bool
+       CreateInVolume1 bool
 
        UseTrashLifeTime bool
+       DifferentMtimes  bool
 
-       DeleteLocator string
+       DeleteLocator    string
+       SpecifyMountUUID bool
 
        ExpectLocator1 bool
        ExpectLocator2 bool
@@ -31,6 +37,7 @@ type TrashWorkerTestData struct {
    Expect no errors.
 */
 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
                Locator1: "5d41402abc4b2a76b9719d911017c592",
                Block1:   []byte("hello"),
@@ -52,16 +59,17 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
    Expect the second locator in volume 2 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH, // first locator
+               DeleteLocator: TestHash, // first locator
 
                ExpectLocator1: false,
                ExpectLocator2: true,
@@ -73,16 +81,17 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
    Expect the first locator in volume 1 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH_2, // locator 2
+               DeleteLocator: TestHash2, // locator 2
 
                ExpectLocator1: true,
                ExpectLocator2: false,
@@ -94,16 +103,17 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
    Expect locator to be deleted from both volumes.
 */
 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH,
-               Block2:   TEST_BLOCK,
+               Locator2: TestHash,
+               Block2:   TestBlock,
 
                CreateData: true,
 
-               DeleteLocator: TEST_HASH,
+               DeleteLocator: TestHash,
 
                ExpectLocator1: false,
                ExpectLocator2: false,
@@ -115,17 +125,18 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
    Delete the second and expect the first to be still around.
 */
 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH,
-               Block2:   TEST_BLOCK,
+               Locator2: TestHash,
+               Block2:   TestBlock,
 
-               CreateData:       true,
-               UseDelayToCreate: true,
+               CreateData:      true,
+               DifferentMtimes: true,
 
-               DeleteLocator: TEST_HASH,
+               DeleteLocator: TestHash,
 
                ExpectLocator1: true,
                ExpectLocator2: false,
@@ -133,22 +144,46 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
        performTrashWorkerTest(testData, t)
 }
 
+// Delete a block that exists on both volumes with matching mtimes,
+// but specify a MountUUID in the request so it only gets deleted from
+// the first volume.
+func TestTrashWorkerIntegration_SpecifyMountUUID(t *testing.T) {
+       theConfig.EnableDelete = true
+       testData := TrashWorkerTestData{
+               Locator1: TestHash,
+               Block1:   TestBlock,
+
+               Locator2: TestHash,
+               Block2:   TestBlock,
+
+               CreateData: true,
+
+               DeleteLocator:    TestHash,
+               SpecifyMountUUID: true,
+
+               ExpectLocator1: true,
+               ExpectLocator2: true,
+       }
+       performTrashWorkerTest(testData, t)
+}
+
 /* Two different locators in volume 1.
    Delete one of them.
    Expect the other unaffected.
 */
 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData:      true,
                CreateInVolume1: true,
 
-               DeleteLocator: TEST_HASH, // locator 1
+               DeleteLocator: TestHash, // locator 1
 
                ExpectLocator1: false,
                ExpectLocator2: true,
@@ -157,22 +192,23 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
 }
 
 /* Allow default Trash Life time to be used. Thus, the newly created block
-   will not be deleted becuase its Mtime is within the trash life time.
+   will not be deleted because its Mtime is within the trash life time.
 */
 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
+       theConfig.EnableDelete = true
        testData := TrashWorkerTestData{
-               Locator1: TEST_HASH,
-               Block1:   TEST_BLOCK,
+               Locator1: TestHash,
+               Block1:   TestBlock,
 
-               Locator2: TEST_HASH_2,
-               Block2:   TEST_BLOCK_2,
+               Locator2: TestHash2,
+               Block2:   TestBlock2,
 
                CreateData:      true,
                CreateInVolume1: true,
 
                UseTrashLifeTime: true,
 
-               DeleteLocator: TEST_HASH, // locator 1
+               DeleteLocator: TestHash, // locator 1
 
                // Since trash life time is in effect, block won't be deleted.
                ExpectLocator1: true,
@@ -181,100 +217,150 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
        performTrashWorkerTest(testData, t)
 }
 
+/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
+   so block won't be deleted.
+*/
+func TestTrashWorkerIntegration_DisabledDelete(t *testing.T) {
+       theConfig.EnableDelete = false
+       testData := TrashWorkerTestData{
+               Locator1: TestHash,
+               Block1:   TestBlock,
+
+               Locator2: TestHash,
+               Block2:   TestBlock,
+
+               CreateData: true,
+
+               DeleteLocator: TestHash,
+
+               ExpectLocator1: true,
+               ExpectLocator2: true,
+       }
+       performTrashWorkerTest(testData, t)
+}
+
 /* Perform the test */
 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
-       actual_permission_ttl := permission_ttl
-
        // Create Keep Volumes
        KeepVM = MakeTestVolumeManager(2)
+       defer KeepVM.Close()
 
        // Put test content
-       vols := KeepVM.Volumes()
+       vols := KeepVM.AllWritable()
        if testData.CreateData {
-               vols[0].Put(testData.Locator1, testData.Block1)
-               vols[0].Put(testData.Locator1+".meta", []byte("metadata"))
-
-               // One of the tests deletes a locator with different Mtimes in two different volumes
-               if testData.UseDelayToCreate {
-                       time.Sleep(1 * time.Second)
-               }
+               vols[0].Put(context.Background(), testData.Locator1, testData.Block1)
+               vols[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))
 
                if testData.CreateInVolume1 {
-                       vols[0].Put(testData.Locator2, testData.Block2)
-                       vols[0].Put(testData.Locator2+".meta", []byte("metadata"))
+                       vols[0].Put(context.Background(), testData.Locator2, testData.Block2)
+                       vols[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
                } else {
-                       vols[1].Put(testData.Locator2, testData.Block2)
-                       vols[1].Put(testData.Locator2+".meta", []byte("metadata"))
+                       vols[1].Put(context.Background(), testData.Locator2, testData.Block2)
+                       vols[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
                }
        }
 
+       oldBlockTime := time.Now().Add(-theConfig.BlobSignatureTTL.Duration() - time.Minute)
+
        // Create TrashRequest for the test
        trashRequest := TrashRequest{
                Locator:    testData.DeleteLocator,
-               BlockMtime: time.Now().Unix(),
+               BlockMtime: oldBlockTime.UnixNano(),
+       }
+       if testData.SpecifyMountUUID {
+               trashRequest.MountUUID = KeepVM.Mounts()[0].UUID
        }
-
-       // delay by permission_ttl to allow deletes to work
-       time.Sleep(1 * time.Second)
 
        // Run trash worker and put the trashRequest on trashq
        trashList := list.New()
        trashList.PushBack(trashRequest)
        trashq = NewWorkQueue()
+       defer trashq.Close()
 
-       // Trash worker would not delete block if its Mtime is within trash life time.
-       // Hence, we will have to bypass it to allow the deletion to succeed.
        if !testData.UseTrashLifeTime {
-               permission_ttl = time.Duration(1) * time.Second
+               // Trash worker would not delete block if its Mtime is
+               // within trash life time. Back-date the block to
+               // allow the deletion to succeed.
+               for _, v := range vols {
+                       v.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
+                       if testData.DifferentMtimes {
+                               oldBlockTime = oldBlockTime.Add(time.Second)
+                       }
+               }
        }
        go RunTrashWorker(trashq)
 
+       // Install gate so all local operations block until we say go
+       gate := make(chan struct{})
+       for _, v := range vols {
+               v.(*MockVolume).Gate = gate
+       }
+
+       assertStatusItem := func(k string, expect float64) {
+               if v := getStatusItem("TrashQueue", k); v != expect {
+                       t.Errorf("Got %s %v, expected %v", k, v, expect)
+               }
+       }
+
+       assertStatusItem("InProgress", 0)
+       assertStatusItem("Queued", 0)
+
+       listLen := trashList.Len()
        trashq.ReplaceQueue(trashList)
-       time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+       // Wait for worker to take request(s)
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })
+
+       // Ensure status.json also reports work is happening
+       assertStatusItem("InProgress", float64(1))
+       assertStatusItem("Queued", float64(listLen-1))
+
+       // Let worker proceed
+       close(gate)
+
+       // Wait for worker to finish
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.Status().InProgress })
 
        // Verify Locator1 to be un/deleted as expected
-       data, _ := GetBlock(testData.Locator1, false)
+       buf := make([]byte, BlockSize)
+       size, err := GetBlock(context.Background(), testData.Locator1, buf, nil)
        if testData.ExpectLocator1 {
-               if len(data) == 0 {
+               if size == 0 || err != nil {
                        t.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
                }
        } else {
-               if len(data) > 0 {
+               if size > 0 || err == nil {
                        t.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
                }
        }
 
        // Verify Locator2 to be un/deleted as expected
        if testData.Locator1 != testData.Locator2 {
-               data, _ = GetBlock(testData.Locator2, false)
+               size, err = GetBlock(context.Background(), testData.Locator2, buf, nil)
                if testData.ExpectLocator2 {
-                       if len(data) == 0 {
+                       if size == 0 || err != nil {
                                t.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
                        }
                } else {
-                       if len(data) > 0 {
+                       if size > 0 || err == nil {
                                t.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
                        }
                }
        }
 
-       // One test used the same locator in two different volumes but with different Mtime values
-       // Hence let's verify that only one volume has it and the other is deleted
-       if (testData.ExpectLocator1) &&
-               (testData.Locator1 == testData.Locator2) {
+       // The DifferentMtimes test puts the same locator in two
+       // different volumes, but only one copy has an Mtime matching
+       // the trash request.
+       if testData.DifferentMtimes {
                locatorFoundIn := 0
-               for _, volume := range KeepVM.Volumes() {
-                       if _, err := volume.Get(testData.Locator1); err == nil {
+               for _, volume := range KeepVM.AllReadable() {
+                       buf := make([]byte, BlockSize)
+                       if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
                                locatorFoundIn = locatorFoundIn + 1
                        }
                }
                if locatorFoundIn != 1 {
-                       t.Errorf("Expected locator to be found in only one volume after deleting. But found: %s", locatorFoundIn)
+                       t.Errorf("Found %d copies of %s, expected 1", locatorFoundIn, testData.Locator1)
                }
        }
-
-       // Done
-       permission_ttl = actual_permission_ttl
-       trashq.Close()
-       KeepVM.Quit()
 }