// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0

package keepstore

import (
	"container/list"
	"runtime"
	"testing"
	"time"
)

type fatalfer interface {
	Fatalf(string, ...interface{})
}

func makeTestWorkList(ary []interface{}) *list.List {
	l := list.New()
	for _, n := range ary {
		l.PushBack(n)
	}
	return l
}

func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
	select {
	case item, ok := <-c:
		if ok {
			t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
		}
	default:
	}
}

func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
	select {
	case item, ok := <-c:
		if !ok {
			t.Fatalf("expected data on a closed channel")
		}
		return item
	case <-time.After(time.Second):
		t.Fatalf("expected data on an empty channel")
		return nil
	}
}

func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
	select {
	case received, ok := <-c:
		if ok {
			t.Fatalf("Expected channel to be closed, but received %+v instead", received)
		}
	case <-time.After(timeout):
		t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
	}
}

func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
	for i := range expected {
		actual, ok := <-q.NextItem
		if !ok {
			t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
		}
		q.DoneItem <- struct{}{}
		if actual.(int) != expected[i] {
			t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
		}
	}
}

func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
	ok := make(chan struct{})
	giveup := false
	go func() {
		for f() != expect && !giveup {
			time.Sleep(time.Millisecond)
		}
		close(ok)
	}()
	select {
	case <-ok:
	case <-time.After(timeout):
		giveup = true
		_, file, line, _ := runtime.Caller(1)
		t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
	}
}

func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
	if l := b.Status().Queued; l != expectQueued {
		t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
	}
}

func TestWorkQueueDoneness(t *testing.T) {
	b := NewWorkQueue()
	defer b.Close()
	b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
	expectQueued(t, b, 3)
	gate := make(chan struct{})
	go func() {
		<-gate
		for range b.NextItem {
			<-gate
			time.Sleep(time.Millisecond)
			b.DoneItem <- struct{}{}
		}
	}()
	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
	b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
	for i := 1; i <= 3; i++ {
		gate <- struct{}{}
		expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
		expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
	}
	close(gate)
	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
	expectChannelEmpty(t, b.NextItem)
}

// Create a WorkQueue, generate a list for it, and instantiate a worker.
func TestWorkQueueReadWrite(t *testing.T) {
	var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}

	b := NewWorkQueue()
	expectQueued(t, b, 0)

	b.ReplaceQueue(makeTestWorkList(input))
	expectQueued(t, b, len(input))

	doWorkItems(t, b, input)
	expectChannelEmpty(t, b.NextItem)
	b.Close()
}

// Start a worker before the list has any input.
func TestWorkQueueEarlyRead(t *testing.T) {
	var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}

	b := NewWorkQueue()
	defer b.Close()

	// First, demonstrate that nothing is available on the NextItem
	// channel.
	expectChannelEmpty(t, b.NextItem)

	// Start a reader in a goroutine. The reader will block until the
	// block work list has been initialized.
	//
	done := make(chan int)
	go func() {
		doWorkItems(t, b, input)
		done <- 1
	}()

	// Feed the blocklist a new worklist, and wait for the worker to
	// finish.
	b.ReplaceQueue(makeTestWorkList(input))
	<-done
	expectQueued(t, b, 0)
}

// After Close(), NextItem closes, work finishes, then stats return zero.
func TestWorkQueueClose(t *testing.T) {
	b := NewWorkQueue()
	input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
	mark := make(chan struct{})
	go func() {
		<-b.NextItem
		mark <- struct{}{}
		<-mark
		b.DoneItem <- struct{}{}
	}()
	b.ReplaceQueue(makeTestWorkList(input))
	// Wait for worker to take item 1
	<-mark
	b.Close()
	expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
	// Tell worker to report done
	mark <- struct{}{}
	expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
	expectChannelClosedWithin(t, time.Second, b.NextItem)
}

// Show that a reader may block when the manager's list is exhausted,
// and that the reader resumes automatically when new data is
// available.
func TestWorkQueueReaderBlocks(t *testing.T) {
	var (
		inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
		inputAfterBlock  = []interface{}{6, 7, 8, 9, 10}
	)

	b := NewWorkQueue()
	defer b.Close()
	sendmore := make(chan int)
	done := make(chan int)
	go func() {
		doWorkItems(t, b, inputBeforeBlock)

		// Confirm that the channel is empty, so a subsequent read
		// on it will block.
		expectChannelEmpty(t, b.NextItem)

		// Signal that we're ready for more input.
		sendmore <- 1
		doWorkItems(t, b, inputAfterBlock)
		done <- 1
	}()

	// Write a slice of the first five elements and wait for the
	// reader to signal that it's ready for us to send more input.
	b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
	<-sendmore

	b.ReplaceQueue(makeTestWorkList(inputAfterBlock))

	// Wait for the reader to complete.
	<-done
}

// Replace one active work list with another.
func TestWorkQueueReplaceQueue(t *testing.T) {
	var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
	var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}

	b := NewWorkQueue()
	b.ReplaceQueue(makeTestWorkList(firstInput))

	// Read just the first five elements from the work list.
	// Confirm that the channel is not empty.
	doWorkItems(t, b, firstInput[0:5])
	expectChannelNotEmpty(t, b.NextItem)

	// Replace the work list and read five more elements.
	// The old list should have been discarded and all new
	// elements come from the new list.
	b.ReplaceQueue(makeTestWorkList(replaceInput))
	doWorkItems(t, b, replaceInput[0:5])

	b.Close()
}