// Copyright (C) The Arvados Authors. All rights reserved. // // SPDX-License-Identifier: AGPL-3.0 package keepstore import ( "container/list" "runtime" "testing" "time" ) type fatalfer interface { Fatalf(string, ...interface{}) } func makeTestWorkList(ary []interface{}) *list.List { l := list.New() for _, n := range ary { l.PushBack(n) } return l } func expectChannelEmpty(t fatalfer, c <-chan interface{}) { select { case item, ok := <-c: if ok { t.Fatalf("Received value (%+v) from channel that we expected to be empty", item) } default: } } func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} { select { case item, ok := <-c: if !ok { t.Fatalf("expected data on a closed channel") } return item case <-time.After(time.Second): t.Fatalf("expected data on an empty channel") return nil } } func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) { select { case received, ok := <-c: if ok { t.Fatalf("Expected channel to be closed, but received %+v instead", received) } case <-time.After(timeout): t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout) } } func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) { for i := range expected { actual, ok := <-q.NextItem if !ok { t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i]) } q.DoneItem <- struct{}{} if actual.(int) != expected[i] { t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i]) } } } func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) { ok := make(chan struct{}) giveup := false go func() { for f() != expect && !giveup { time.Sleep(time.Millisecond) } close(ok) }() select { case <-ok: case <-time.After(timeout): giveup = true _, file, line, _ := runtime.Caller(1) t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line) } } func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { if l := b.Status().Queued; l != expectQueued { t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) } } func TestWorkQueueDoneness(t *testing.T) { b := NewWorkQueue() defer b.Close() b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3})) expectQueued(t, b, 3) gate := make(chan struct{}) go func() { <-gate for range b.NextItem { <-gate time.Sleep(time.Millisecond) b.DoneItem <- struct{}{} } }() expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6})) for i := 1; i <= 3; i++ { gate <- struct{}{} expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) } close(gate) expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelEmpty(t, b.NextItem) } // Create a WorkQueue, generate a list for it, and instantiate a worker. func TestWorkQueueReadWrite(t *testing.T) { var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() expectQueued(t, b, 0) b.ReplaceQueue(makeTestWorkList(input)) expectQueued(t, b, len(input)) doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) b.Close() } // Start a worker before the list has any input. func TestWorkQueueEarlyRead(t *testing.T) { var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() defer b.Close() // First, demonstrate that nothing is available on the NextItem // channel. expectChannelEmpty(t, b.NextItem) // Start a reader in a goroutine. The reader will block until the // block work list has been initialized. // done := make(chan int) go func() { doWorkItems(t, b, input) done <- 1 }() // Feed the blocklist a new worklist, and wait for the worker to // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done expectQueued(t, b, 0) } // After Close(), NextItem closes, work finishes, then stats return zero. func TestWorkQueueClose(t *testing.T) { b := NewWorkQueue() input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8} mark := make(chan struct{}) go func() { <-b.NextItem mark <- struct{}{} <-mark b.DoneItem <- struct{}{} }() b.ReplaceQueue(makeTestWorkList(input)) // Wait for worker to take item 1 <-mark b.Close() expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) // Tell worker to report done mark <- struct{}{} expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelClosedWithin(t, time.Second, b.NextItem) } // Show that a reader may block when the manager's list is exhausted, // and that the reader resumes automatically when new data is // available. func TestWorkQueueReaderBlocks(t *testing.T) { var ( inputBeforeBlock = []interface{}{1, 2, 3, 4, 5} inputAfterBlock = []interface{}{6, 7, 8, 9, 10} ) b := NewWorkQueue() defer b.Close() sendmore := make(chan int) done := make(chan int) go func() { doWorkItems(t, b, inputBeforeBlock) // Confirm that the channel is empty, so a subsequent read // on it will block. expectChannelEmpty(t, b.NextItem) // Signal that we're ready for more input. sendmore <- 1 doWorkItems(t, b, inputAfterBlock) done <- 1 }() // Write a slice of the first five elements and wait for the // reader to signal that it's ready for us to send more input. b.ReplaceQueue(makeTestWorkList(inputBeforeBlock)) <-sendmore b.ReplaceQueue(makeTestWorkList(inputAfterBlock)) // Wait for the reader to complete. <-done } // Replace one active work list with another. func TestWorkQueueReplaceQueue(t *testing.T) { var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81} b := NewWorkQueue() b.ReplaceQueue(makeTestWorkList(firstInput)) // Read just the first five elements from the work list. // Confirm that the channel is not empty. doWorkItems(t, b, firstInput[0:5]) expectChannelNotEmpty(t, b.NextItem) // Replace the work list and read five more elements. // The old list should have been discarded and all new // elements come from the new list. b.ReplaceQueue(makeTestWorkList(replaceInput)) doWorkItems(t, b, replaceInput[0:5]) b.Close() }