// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package main

import (
	"container/list"
	"context"
	"time"

	"git.arvados.org/arvados.git/sdk/go/ctxlog"
	"github.com/prometheus/client_golang/prometheus"
	check "gopkg.in/check.v1"
)

type TrashWorkerTestData struct {
	Locator1    string
	Block1      []byte
	BlockMtime1 int64

	Locator2    string
	Block2      []byte
	BlockMtime2 int64

	CreateData      bool
	CreateInVolume1 bool

	UseTrashLifeTime bool
	DifferentMtimes  bool

	DeleteLocator    string
	SpecifyMountUUID bool

	ExpectLocator1 bool
	ExpectLocator2 bool
}

/* Delete block that does not exist in any of the keep volumes.
   Expect no errors.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_GetNonExistingLocator(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: "5d41402abc4b2a76b9719d911017c592",
		Block1:   []byte("hello"),

		Locator2: "5d41402abc4b2a76b9719d911017c592",
		Block2:   []byte("hello"),

		CreateData: false,

		DeleteLocator: "5d41402abc4b2a76b9719d911017c592",

		ExpectLocator1: false,
		ExpectLocator2: false,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Delete a block that exists on volume 1 of the keep servers.
   Expect the second locator in volume 2 to be unaffected.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume1(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash2,
		Block2:   TestBlock2,

		CreateData: true,

		DeleteLocator: TestHash, // first locator

		ExpectLocator1: false,
		ExpectLocator2: true,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Delete a block that exists on volume 2 of the keep servers.
   Expect the first locator in volume 1 to be unaffected.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInVolume2(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash2,
		Block2:   TestBlock2,

		CreateData: true,

		DeleteLocator: TestHash2, // locator 2

		ExpectLocator1: true,
		ExpectLocator2: false,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Delete a block with matching mtime for locator in both volumes.
   Expect locator to be deleted from both volumes.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_LocatorInBothVolumes(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash,
		Block2:   TestBlock,

		CreateData: true,

		DeleteLocator: TestHash,

		ExpectLocator1: false,
		ExpectLocator2: false,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Same locator with different Mtimes exists in both volumes.
   Delete the second and expect the first to be still around.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_MtimeMatchesForLocator1ButNotForLocator2(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash,
		Block2:   TestBlock,

		CreateData:      true,
		DifferentMtimes: true,

		DeleteLocator: TestHash,

		ExpectLocator1: true,
		ExpectLocator2: false,
	}
	s.performTrashWorkerTest(c, testData)
}

// Delete a block that exists on both volumes with matching mtimes,
// but specify a MountUUID in the request so it only gets deleted from
// the first volume.
func (s *HandlerSuite) TestTrashWorkerIntegration_SpecifyMountUUID(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash,
		Block2:   TestBlock,

		CreateData: true,

		DeleteLocator:    TestHash,
		SpecifyMountUUID: true,

		ExpectLocator1: true,
		ExpectLocator2: true,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Two different locators in volume 1.
   Delete one of them.
   Expect the other unaffected.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_TwoDifferentLocatorsInVolume1(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash2,
		Block2:   TestBlock2,

		CreateData:      true,
		CreateInVolume1: true,

		DeleteLocator: TestHash, // locator 1

		ExpectLocator1: false,
		ExpectLocator2: true,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Allow default Trash Life time to be used. Thus, the newly created block
   will not be deleted because its Mtime is within the trash life time.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(c *check.C) {
	s.cluster.Collections.BlobTrash = true
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash2,
		Block2:   TestBlock2,

		CreateData:      true,
		CreateInVolume1: true,

		UseTrashLifeTime: true,

		DeleteLocator: TestHash, // locator 1

		// Since trash life time is in effect, block won't be deleted.
		ExpectLocator1: true,
		ExpectLocator2: true,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Delete a block with matching mtime for locator in both volumes, but EnableDelete is false,
   so block won't be deleted.
*/
func (s *HandlerSuite) TestTrashWorkerIntegration_DisabledDelete(c *check.C) {
	s.cluster.Collections.BlobTrash = false
	testData := TrashWorkerTestData{
		Locator1: TestHash,
		Block1:   TestBlock,

		Locator2: TestHash,
		Block2:   TestBlock,

		CreateData: true,

		DeleteLocator: TestHash,

		ExpectLocator1: true,
		ExpectLocator2: true,
	}
	s.performTrashWorkerTest(c, testData)
}

/* Perform the test */
func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTestData) {
	c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
	// Replace the router's trashq -- which the worker goroutines
	// started by setup() are now receiving from -- with a new
	// one, so we can see what the handler sends to it.
	trashq := NewWorkQueue()
	s.handler.Handler.(*router).trashq = trashq

	// Put test content
	mounts := s.handler.volmgr.AllWritable()
	if testData.CreateData {
		mounts[0].Put(context.Background(), testData.Locator1, testData.Block1)
		mounts[0].Put(context.Background(), testData.Locator1+".meta", []byte("metadata"))

		if testData.CreateInVolume1 {
			mounts[0].Put(context.Background(), testData.Locator2, testData.Block2)
			mounts[0].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
		} else {
			mounts[1].Put(context.Background(), testData.Locator2, testData.Block2)
			mounts[1].Put(context.Background(), testData.Locator2+".meta", []byte("metadata"))
		}
	}

	oldBlockTime := time.Now().Add(-s.cluster.Collections.BlobSigningTTL.Duration() - time.Minute)

	// Create TrashRequest for the test
	trashRequest := TrashRequest{
		Locator:    testData.DeleteLocator,
		BlockMtime: oldBlockTime.UnixNano(),
	}
	if testData.SpecifyMountUUID {
		trashRequest.MountUUID = s.handler.volmgr.Mounts()[0].UUID
	}

	// Run trash worker and put the trashRequest on trashq
	trashList := list.New()
	trashList.PushBack(trashRequest)

	if !testData.UseTrashLifeTime {
		// Trash worker would not delete block if its Mtime is
		// within trash life time. Back-date the block to
		// allow the deletion to succeed.
		for _, mnt := range mounts {
			mnt.Volume.(*MockVolume).Timestamps[testData.DeleteLocator] = oldBlockTime
			if testData.DifferentMtimes {
				oldBlockTime = oldBlockTime.Add(time.Second)
			}
		}
	}
	go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)

	// Install gate so all local operations block until we say go
	gate := make(chan struct{})
	for _, mnt := range mounts {
		mnt.Volume.(*MockVolume).Gate = gate
	}

	assertStatusItem := func(k string, expect float64) {
		if v := getStatusItem(s.handler, "TrashQueue", k); v != expect {
			c.Errorf("Got %s %v, expected %v", k, v, expect)
		}
	}

	assertStatusItem("InProgress", 0)
	assertStatusItem("Queued", 0)

	listLen := trashList.Len()
	trashq.ReplaceQueue(trashList)

	// Wait for worker to take request(s)
	expectEqualWithin(c, time.Second, listLen, func() interface{} { return trashq.Status().InProgress })

	// Ensure status.json also reports work is happening
	assertStatusItem("InProgress", float64(1))
	assertStatusItem("Queued", float64(listLen-1))

	// Let worker proceed
	close(gate)

	// Wait for worker to finish
	expectEqualWithin(c, time.Second, 0, func() interface{} { return trashq.Status().InProgress })

	// Verify Locator1 to be un/deleted as expected
	buf := make([]byte, BlockSize)
	size, err := GetBlock(context.Background(), s.handler.volmgr, testData.Locator1, buf, nil)
	if testData.ExpectLocator1 {
		if size == 0 || err != nil {
			c.Errorf("Expected Locator1 to be still present: %s", testData.Locator1)
		}
	} else {
		if size > 0 || err == nil {
			c.Errorf("Expected Locator1 to be deleted: %s", testData.Locator1)
		}
	}

	// Verify Locator2 to be un/deleted as expected
	if testData.Locator1 != testData.Locator2 {
		size, err = GetBlock(context.Background(), s.handler.volmgr, testData.Locator2, buf, nil)
		if testData.ExpectLocator2 {
			if size == 0 || err != nil {
				c.Errorf("Expected Locator2 to be still present: %s", testData.Locator2)
			}
		} else {
			if size > 0 || err == nil {
				c.Errorf("Expected Locator2 to be deleted: %s", testData.Locator2)
			}
		}
	}

	// The DifferentMtimes test puts the same locator in two
	// different volumes, but only one copy has an Mtime matching
	// the trash request.
	if testData.DifferentMtimes {
		locatorFoundIn := 0
		for _, volume := range s.handler.volmgr.AllReadable() {
			buf := make([]byte, BlockSize)
			if _, err := volume.Get(context.Background(), testData.Locator1, buf); err == nil {
				locatorFoundIn = locatorFoundIn + 1
			}
		}
		c.Check(locatorFoundIn, check.Equals, 1)
	}
}