X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/bd0d76fa76bbb0697c13487bb7d04f0b56d01a29..61dbc4445159c9140b0744bf5526ce0f71f4f374:/services/keepstore/work_queue_test.go diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go index 144e4c252b..6b31795293 100644 --- a/services/keepstore/work_queue_test.go +++ b/services/keepstore/work_queue_test.go @@ -2,9 +2,15 @@ package main import ( "container/list" + "runtime" "testing" + "time" ) +type fatalfer interface { + Fatalf(string, ...interface{}) +} + func makeTestWorkList(ary []int) *list.List { l := list.New() for _, n := range ary { @@ -13,40 +19,101 @@ func makeTestWorkList(ary []int) *list.List { return l } -func expectChannelEmpty(t *testing.T, c <-chan interface{}) { +func expectChannelEmpty(t fatalfer, c <-chan interface{}) { select { - case item := <-c: - t.Fatalf("Received value (%v) from channel that we expected to be empty", item) + case item, ok := <-c: + if ok { + t.Fatalf("Received value (%+v) from channel that we expected to be empty", item) + } default: - // no-op } } -func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) { - if item, ok := <-c; !ok { - t.Fatal("expected data on a closed channel") - } else if item == nil { - t.Fatal("expected data on an empty channel") +func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} { + select { + case item, ok := <-c: + if !ok { + t.Fatalf("expected data on a closed channel") + } + return item + case <-time.After(time.Second): + t.Fatalf("expected data on an empty channel") + return nil } } -func expectChannelClosed(t *testing.T, c <-chan interface{}) { - received, ok := <-c - if ok { - t.Fatalf("Expected channel to be closed, but received %v instead", received) +func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) { + select { + case received, ok := <-c: + if ok { + t.Fatalf("Expected channel to be closed, but received %+v instead", received) + } + case <-time.After(timeout): + t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout) } } -func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) { +func doWorkItems(t fatalfer, q *WorkQueue, expected []int) { for i := range expected { - actual, ok := <-c - t.Logf("received %v", actual) + actual, ok := <-q.NextItem if !ok { - t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i) - } else if actual.(int) != expected[i] { - t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i) + t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i]) + } + q.DoneItem <- struct{}{} + if actual.(int) != expected[i] { + t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i]) + } + } +} + +func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) { + ok := make(chan struct{}) + giveup := false + go func() { + for f() != expect && !giveup { + time.Sleep(time.Millisecond) + } + close(ok) + }() + select { + case <-ok: + case <-time.After(timeout): + giveup = true + _, file, line, _ := runtime.Caller(1) + t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line) + } +} + +func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { + if l := b.Status().Queued; l != expectQueued { + t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) + } +} + +func TestWorkQueueDoneness(t *testing.T) { + b := NewWorkQueue() + defer b.Close() + b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3})) + expectQueued(t, b, 3) + gate := make(chan struct{}) + go func() { + <-gate + for range b.NextItem { + <-gate + time.Sleep(time.Millisecond) + b.DoneItem <- struct{}{} } + }() + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6})) + for i := 1; i <= 3; i++ { + gate <- struct{}{} + expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) } + close(gate) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + expectChannelEmpty(t, b.NextItem) } // Create a WorkQueue, generate a list for it, and instantiate a worker. @@ -54,9 +121,12 @@ func TestWorkQueueReadWrite(t *testing.T) { var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() + expectQueued(t, b, 0) + b.ReplaceQueue(makeTestWorkList(input)) + expectQueued(t, b, len(input)) - expectFromChannel(t, b.NextItem, input) + doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) b.Close() } @@ -66,6 +136,7 @@ func TestWorkQueueEarlyRead(t *testing.T) { var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() + defer b.Close() // First, demonstrate that nothing is available on the NextItem // channel. @@ -76,8 +147,7 @@ func TestWorkQueueEarlyRead(t *testing.T) { // done := make(chan int) go func() { - expectFromChannel(t, b.NextItem, input) - b.Close() + doWorkItems(t, b, input) done <- 1 }() @@ -85,8 +155,29 @@ func TestWorkQueueEarlyRead(t *testing.T) { // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done + expectQueued(t, b, 0) +} - expectChannelClosed(t, b.NextItem) +// After Close(), NextItem closes, work finishes, then stats return zero. +func TestWorkQueueClose(t *testing.T) { + b := NewWorkQueue() + input := []int{1, 2, 3, 4, 5, 6, 7, 8} + mark := make(chan struct{}) + go func() { + <-b.NextItem + mark <- struct{}{} + <-mark + b.DoneItem <- struct{}{} + }() + b.ReplaceQueue(makeTestWorkList(input)) + // Wait for worker to take item 1 + <-mark + b.Close() + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + // Tell worker to report done + mark <- struct{}{} + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + expectChannelClosedWithin(t, time.Second, b.NextItem) } // Show that a reader may block when the manager's list is exhausted, @@ -99,10 +190,11 @@ func TestWorkQueueReaderBlocks(t *testing.T) { ) b := NewWorkQueue() + defer b.Close() sendmore := make(chan int) done := make(chan int) go func() { - expectFromChannel(t, b.NextItem, inputBeforeBlock) + doWorkItems(t, b, inputBeforeBlock) // Confirm that the channel is empty, so a subsequent read // on it will block. @@ -110,8 +202,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) { // Signal that we're ready for more input. sendmore <- 1 - expectFromChannel(t, b.NextItem, inputAfterBlock) - b.Close() + doWorkItems(t, b, inputAfterBlock) done <- 1 }() @@ -136,14 +227,14 @@ func TestWorkQueueReplaceQueue(t *testing.T) { // Read just the first five elements from the work list. // Confirm that the channel is not empty. - expectFromChannel(t, b.NextItem, firstInput[0:5]) + doWorkItems(t, b, firstInput[0:5]) expectChannelNotEmpty(t, b.NextItem) // Replace the work list and read five more elements. // The old list should have been discarded and all new // elements come from the new list. b.ReplaceQueue(makeTestWorkList(replaceInput)) - expectFromChannel(t, b.NextItem, replaceInput[0:5]) + doWorkItems(t, b, replaceInput[0:5]) b.Close() }