// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package keepstore

import (
	"context"
	"sync"
	"sync/atomic"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"github.com/prometheus/client_golang/prometheus"
)

type TrashListItem struct {
	Locator    string `json:"locator"`
	BlockMtime int64  `json:"block_mtime"`
	MountUUID  string `json:"mount_uuid"` // Target mount, or "" for "everywhere"
}

type trasher struct {
	keepstore  *keepstore
	todo       []TrashListItem
	cond       *sync.Cond // lock guards todo accesses; cond broadcasts when todo becomes non-empty
	inprogress atomic.Int64
}

func newTrasher(ctx context.Context, keepstore *keepstore, reg *prometheus.Registry) *trasher {
	t := &trasher{
		keepstore: keepstore,
		cond:      sync.NewCond(&sync.Mutex{}),
	}
	reg.MustRegister(prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: "arvados",
			Subsystem: "keepstore",
			Name:      "trash_queue_pending_entries",
			Help:      "Number of queued trash requests",
		},
		func() float64 {
			t.cond.L.Lock()
			defer t.cond.L.Unlock()
			return float64(len(t.todo))
		},
	))
	reg.MustRegister(prometheus.NewGaugeFunc(
		prometheus.GaugeOpts{
			Namespace: "arvados",
			Subsystem: "keepstore",
			Name:      "trash_queue_inprogress_entries",
			Help:      "Number of trash requests in progress",
		},
		func() float64 {
			return float64(t.inprogress.Load())
		},
	))
	if !keepstore.cluster.Collections.BlobTrash {
		keepstore.logger.Info("not running trash worker because Collections.BlobTrash == false")
		return t
	}

	var mntsAllowTrash []*mount
	for _, mnt := range t.keepstore.mounts {
		if mnt.AllowTrash {
			mntsAllowTrash = append(mntsAllowTrash, mnt)
		}
	}
	if len(mntsAllowTrash) == 0 {
		t.keepstore.logger.Info("not running trash worker because there are no writable or trashable volumes")
	} else {
		for i := 0; i < keepstore.cluster.Collections.BlobTrashConcurrency; i++ {
			go t.runWorker(ctx, mntsAllowTrash)
		}
	}
	return t
}

func (t *trasher) SetTrashList(newlist []TrashListItem) {
	t.cond.L.Lock()
	t.todo = newlist
	t.cond.L.Unlock()
	t.cond.Broadcast()
}

func (t *trasher) runWorker(ctx context.Context, mntsAllowTrash []*mount) {
	go func() {
		<-ctx.Done()
		t.cond.Broadcast()
	}()
	for {
		t.cond.L.Lock()
		for len(t.todo) == 0 && ctx.Err() == nil {
			t.cond.Wait()
		}
		if ctx.Err() != nil {
			t.cond.L.Unlock()
			return
		}
		item := t.todo[0]
		t.todo = t.todo[1:]
		t.inprogress.Add(1)
		t.cond.L.Unlock()

		func() {
			defer t.inprogress.Add(-1)
			logger := t.keepstore.logger.WithField("locator", item.Locator)

			li, err := getLocatorInfo(item.Locator)
			if err != nil {
				logger.Warn("ignoring trash request for invalid locator")
				return
			}

			reqMtime := time.Unix(0, item.BlockMtime)
			if time.Since(reqMtime) < t.keepstore.cluster.Collections.BlobSigningTTL.Duration() {
				logger.Warnf("client asked to delete a %v old block (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
					arvados.Duration(time.Since(reqMtime)),
					item.BlockMtime,
					reqMtime,
					t.keepstore.cluster.Collections.BlobSigningTTL)
				return
			}

			var mnts []*mount
			if item.MountUUID == "" {
				mnts = mntsAllowTrash
			} else if mnt := t.keepstore.mounts[item.MountUUID]; mnt == nil {
				logger.Warnf("ignoring trash request for nonexistent mount %s", item.MountUUID)
				return
			} else if !mnt.AllowTrash {
				logger.Warnf("ignoring trash request for readonly mount %s with AllowTrashWhenReadOnly==false", item.MountUUID)
				return
			} else {
				mnts = []*mount{mnt}
			}

			for _, mnt := range mnts {
				logger := logger.WithField("mount", mnt.UUID)
				mtime, err := mnt.Mtime(li.hash)
				if err != nil {
					logger.WithError(err).Error("error getting stored mtime")
					continue
				}
				if !mtime.Equal(reqMtime) {
					logger.Infof("stored mtime (%v) does not match trash list mtime (%v); skipping", mtime, reqMtime)
					continue
				}
				err = mnt.BlockTrash(li.hash)
				if err != nil {
					logger.WithError(err).Info("error trashing block")
					continue
				}
				logger.Info("block trashed")
			}
		}()
	}
}

type trashEmptier struct{}

func newTrashEmptier(ctx context.Context, ks *keepstore, reg *prometheus.Registry) *trashEmptier {
	d := ks.cluster.Collections.BlobTrashCheckInterval.Duration()
	if d <= 0 ||
		!ks.cluster.Collections.BlobTrash ||
		ks.cluster.Collections.BlobDeleteConcurrency <= 0 {
		ks.logger.Infof("not running trash emptier because disabled by config (enabled=%t, interval=%v, concurrency=%d)", ks.cluster.Collections.BlobTrash, d, ks.cluster.Collections.BlobDeleteConcurrency)
		return &trashEmptier{}
	}
	go func() {
		ticker := time.NewTicker(d)
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
			}
			for _, mnt := range ks.mounts {
				if mnt.KeepMount.AllowTrash {
					mnt.volume.EmptyTrash()
				}
			}
		}
	}()
	return &trashEmptier{}
}