// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package keepstore

import (
	"bytes"
	"context"
	"crypto/md5"
	"errors"
	"fmt"
	"io"
	"os"
	"strings"
	"sync"
	"time"

	"git.arvados.org/arvados.git/sdk/go/arvados"
	"github.com/sirupsen/logrus"
)

var (
	TestBlock       = []byte("The quick brown fox jumps over the lazy dog.")
	TestHash        = "e4d909c290d0fb1ca068ffaddf22cbd0"
	TestHashPutResp = "e4d909c290d0fb1ca068ffaddf22cbd0+44\n"

	TestBlock2 = []byte("Pack my box with five dozen liquor jugs.")
	TestHash2  = "f15ac516f788aec4f30932ffb6395c39"

	TestBlock3 = []byte("Now is the time for all good men to come to the aid of their country.")
	TestHash3  = "eed29bbffbc2dbe5e5ee0bb71888e61f"

	// BadBlock is used to test collisions and corruption.
	// It must not match any test hashes.
	BadBlock = []byte("The magic words are squeamish ossifrage.")

	EmptyHash  = "d41d8cd98f00b204e9800998ecf8427e"
	EmptyBlock = []byte("")
)

// A TestableVolume allows test suites to manipulate the state of an
// underlying Volume, in order to test behavior in cases that are
// impractical to achieve with a sequence of normal Volume operations.
type TestableVolume interface {
	Volume

	// [Over]write content for a locator with the given data,
	// bypassing all constraints like readonly and serialize.
	PutRaw(locator string, data []byte)

	// Returns the strings that a driver uses to record read/write operations.
	ReadWriteOperationLabelValues() (r, w string)

	// Specify the value Mtime() should return, until the next
	// call to Touch, TouchWithDate, or Put.
	TouchWithDate(locator string, lastPut time.Time)

	// Clean up, delete temporary files.
	Teardown()
}

func init() {
	driver["mock"] = newMockVolume
}

// MockVolumes are test doubles for Volumes, used to test handlers.
type MockVolume struct {
	Store      map[string][]byte
	Timestamps map[string]time.Time

	// Bad volumes return an error for every operation.
	Bad            bool
	BadVolumeError error

	// Touchable volumes' Touch() method succeeds for a locator
	// that has been Put().
	Touchable bool

	// Gate is a "starting gate", allowing test cases to pause
	// volume operations long enough to inspect state. Every
	// operation (except Status) starts by receiving from
	// Gate. Sending one value unblocks one operation; closing the
	// channel unblocks all operations. By default, Gate is a
	// closed channel, so all operations proceed without
	// blocking. See trash_worker_test.go for an example.
	Gate chan struct{} `json:"-"`

	cluster *arvados.Cluster
	volume  arvados.Volume
	logger  logrus.FieldLogger
	metrics *volumeMetricsVecs
	called  map[string]int
	mutex   sync.Mutex
}

// newMockVolume returns a non-Bad, non-Readonly, Touchable mock
// volume.
func newMockVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
	gate := make(chan struct{})
	close(gate)
	return &MockVolume{
		Store:      make(map[string][]byte),
		Timestamps: make(map[string]time.Time),
		Bad:        false,
		Touchable:  true,
		called:     map[string]int{},
		Gate:       gate,
		cluster:    cluster,
		volume:     volume,
		logger:     logger,
		metrics:    metrics,
	}, nil
}

// CallCount returns how many times the named method has been called.
func (v *MockVolume) CallCount(method string) int {
	v.mutex.Lock()
	defer v.mutex.Unlock()
	c, ok := v.called[method]
	if !ok {
		return 0
	}
	return c
}

func (v *MockVolume) gotCall(method string) {
	v.mutex.Lock()
	defer v.mutex.Unlock()
	if _, ok := v.called[method]; !ok {
		v.called[method] = 1
	} else {
		v.called[method]++
	}
}

func (v *MockVolume) Compare(ctx context.Context, loc string, buf []byte) error {
	v.gotCall("Compare")
	<-v.Gate
	if v.Bad {
		return v.BadVolumeError
	} else if block, ok := v.Store[loc]; ok {
		if fmt.Sprintf("%x", md5.Sum(block)) != loc {
			return DiskHashError
		}
		if bytes.Compare(buf, block) != 0 {
			return CollisionError
		}
		return nil
	} else {
		return os.ErrNotExist
	}
}

func (v *MockVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
	v.gotCall("Get")
	<-v.Gate
	if v.Bad {
		return 0, v.BadVolumeError
	} else if block, ok := v.Store[loc]; ok {
		copy(buf[:len(block)], block)
		return len(block), nil
	}
	return 0, os.ErrNotExist
}

func (v *MockVolume) Put(ctx context.Context, loc string, block []byte) error {
	v.gotCall("Put")
	<-v.Gate
	if v.Bad {
		return v.BadVolumeError
	}
	if v.volume.ReadOnly {
		return MethodDisabledError
	}
	v.Store[loc] = block
	return v.Touch(loc)
}

func (v *MockVolume) Touch(loc string) error {
	return v.TouchWithDate(loc, time.Now())
}

func (v *MockVolume) TouchWithDate(loc string, t time.Time) error {
	v.gotCall("Touch")
	<-v.Gate
	if v.volume.ReadOnly {
		return MethodDisabledError
	}
	if _, exists := v.Store[loc]; !exists {
		return os.ErrNotExist
	}
	if v.Touchable {
		v.Timestamps[loc] = t
		return nil
	}
	return errors.New("Touch failed")
}

func (v *MockVolume) Mtime(loc string) (time.Time, error) {
	v.gotCall("Mtime")
	<-v.Gate
	var mtime time.Time
	var err error
	if v.Bad {
		err = v.BadVolumeError
	} else if t, ok := v.Timestamps[loc]; ok {
		mtime = t
	} else {
		err = os.ErrNotExist
	}
	return mtime, err
}

func (v *MockVolume) IndexTo(prefix string, w io.Writer) error {
	v.gotCall("IndexTo")
	<-v.Gate
	for loc, block := range v.Store {
		if !IsValidLocator(loc) || !strings.HasPrefix(loc, prefix) {
			continue
		}
		_, err := fmt.Fprintf(w, "%s+%d %d\n",
			loc, len(block), 123456789)
		if err != nil {
			return err
		}
	}
	return nil
}

func (v *MockVolume) Trash(loc string) error {
	v.gotCall("Delete")
	<-v.Gate
	if v.volume.ReadOnly {
		return MethodDisabledError
	}
	if _, ok := v.Store[loc]; ok {
		if time.Since(v.Timestamps[loc]) < time.Duration(v.cluster.Collections.BlobSigningTTL) {
			return nil
		}
		delete(v.Store, loc)
		return nil
	}
	return os.ErrNotExist
}

func (v *MockVolume) GetDeviceID() string {
	return "mock-device-id"
}

func (v *MockVolume) Untrash(loc string) error {
	return nil
}

func (v *MockVolume) Status() *VolumeStatus {
	var used uint64
	for _, block := range v.Store {
		used = used + uint64(len(block))
	}
	return &VolumeStatus{"/bogo", 123, 1000000 - used, used}
}

func (v *MockVolume) String() string {
	return "[MockVolume]"
}

func (v *MockVolume) EmptyTrash() {
}

func (v *MockVolume) GetStorageClasses() []string {
	return nil
}