6260: Expose queue sizes in /status.json. Fix sleep/race in trash_worker_test.
[arvados.git] / services / keepstore / trash_worker_test.go
index 0511b48d372fc3cc33b5513411512f624f12a199..433eef57863d751154df4e2c5e937da589b9bab3 100644 (file)
@@ -31,6 +31,7 @@ type TrashWorkerTestData struct {
    Expect no errors.
 */
 func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: "5d41402abc4b2a76b9719d911017c592",
                Block1:   []byte("hello"),
@@ -52,6 +53,7 @@ func TestTrashWorkerIntegration_GetNonExistingLocator(t *testing.T) {
    Expect the second locator in volume 2 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -73,6 +75,7 @@ func TestTrashWorkerIntegration_LocatorInVolume1(t *testing.T) {
    Expect the first locator in volume 1 to be unaffected.
 */
 func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -94,6 +97,7 @@ func TestTrashWorkerIntegration_LocatorInVolume2(t *testing.T) {
    Expect locator to be deleted from both volumes.
 */
 func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -115,6 +119,7 @@ func TestTrashWorkerIntegration_LocatorInBothVolumes(t *testing.T) {
    Delete the second and expect the first to be still around.
 */
 func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -138,6 +143,7 @@ func TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(t *test
    Expect the other unaffected.
 */
 func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -160,6 +166,7 @@ func TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(t *testing.T) {
    will not be deleted becuase its Mtime is within the trash life time.
 */
 func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
+       never_delete = false
        testData := TrashWorkerTestData{
                Locator1: TEST_HASH,
                Block1:   TEST_BLOCK,
@@ -181,6 +188,28 @@ func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(
        performTrashWorkerTest(testData, t)
 }
 
+/* Delete a block with matching mtime for locator in both volumes, but never_delete is true,
+   so block won't be deleted.
+*/
+func TestTrashWorkerIntegration_NeverDelete(t *testing.T) {
+       never_delete = true
+       testData := TrashWorkerTestData{
+               Locator1: TEST_HASH,
+               Block1:   TEST_BLOCK,
+
+               Locator2: TEST_HASH,
+               Block2:   TEST_BLOCK,
+
+               CreateData: true,
+
+               DeleteLocator: TEST_HASH,
+
+               ExpectLocator1: true,
+               ExpectLocator2: true,
+       }
+       performTrashWorkerTest(testData, t)
+}
+
 /* Perform the test */
 func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        // Create Keep Volumes
@@ -229,8 +258,39 @@ func performTrashWorkerTest(testData TrashWorkerTestData, t *testing.T) {
        }
        go RunTrashWorker(trashq)
 
+       // Install gate so all local operations block until we say go
+       gate := make(chan struct{})
+       for _, v := range vols {
+               v.(*MockVolume).Gate = gate
+       }
+
+       assertStatusItem := func(k string, expect float64) {
+               if v := getStatusItem("TrashQueue", k); v != expect {
+                       t.Errorf("Got %s %v, expected %v", k, v, expect)
+               }
+       }
+
+       assertStatusItem("InProgress", 0)
+       assertStatusItem("Outstanding", 0)
+       assertStatusItem("Queued", 0)
+
+       listLen := trashList.Len()
        trashq.ReplaceQueue(trashList)
-       time.Sleep(10 * time.Millisecond) // give a moment to finish processing the list
+
+       // Wait for worker to take request(s)
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountOutstanding() })
+       expectEqualWithin(t, time.Second, listLen, func() interface{} { return trashq.CountInProgress() })
+
+       // Ensure status.json also reports work is happening
+       assertStatusItem("InProgress", float64(1))
+       assertStatusItem("Outstanding", float64(listLen))
+       assertStatusItem("Queued", float64(listLen-1))
+
+       // Let worker proceed
+       close(gate)
+
+       // Wait for worker to finish
+       expectEqualWithin(t, time.Second, 0, func() interface{} { return trashq.CountOutstanding() })
 
        // Verify Locator1 to be un/deleted as expected
        data, _ := GetBlock(testData.Locator1, false)