X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/5ff389fc3450f97b62ef919109d6cc5691c96897..85c625c40ad873d0efac33f8a63c1ee256185e36:/services/keepstore/work_queue_test.go diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go index 144e4c252b..254f96cb2d 100644 --- a/services/keepstore/work_queue_test.go +++ b/services/keepstore/work_queue_test.go @@ -1,11 +1,21 @@ -package main +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package keepstore import ( "container/list" + "runtime" "testing" + "time" ) -func makeTestWorkList(ary []int) *list.List { +type fatalfer interface { + Fatalf(string, ...interface{}) +} + +func makeTestWorkList(ary []interface{}) *list.List { l := list.New() for _, n := range ary { l.PushBack(n) @@ -13,59 +23,124 @@ func makeTestWorkList(ary []int) *list.List { return l } -func expectChannelEmpty(t *testing.T, c <-chan interface{}) { +func expectChannelEmpty(t fatalfer, c <-chan interface{}) { select { - case item := <-c: - t.Fatalf("Received value (%v) from channel that we expected to be empty", item) + case item, ok := <-c: + if ok { + t.Fatalf("Received value (%+v) from channel that we expected to be empty", item) + } default: - // no-op } } -func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) { - if item, ok := <-c; !ok { - t.Fatal("expected data on a closed channel") - } else if item == nil { - t.Fatal("expected data on an empty channel") +func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} { + select { + case item, ok := <-c: + if !ok { + t.Fatalf("expected data on a closed channel") + } + return item + case <-time.After(time.Second): + t.Fatalf("expected data on an empty channel") + return nil } } -func expectChannelClosed(t *testing.T, c <-chan interface{}) { - received, ok := <-c - if ok { - t.Fatalf("Expected channel to be closed, but received %v instead", received) +func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) { + select { + case received, ok := <-c: + if ok { + t.Fatalf("Expected channel to be closed, but received %+v instead", received) + } + case <-time.After(timeout): + t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout) } } -func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) { +func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) { for i := range expected { - actual, ok := <-c - t.Logf("received %v", actual) + actual, ok := <-q.NextItem if !ok { - t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i) - } else if actual.(int) != expected[i] { - t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i) + t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i]) + } + q.DoneItem <- struct{}{} + if actual.(int) != expected[i] { + t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i]) } } } +func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) { + ok := make(chan struct{}) + giveup := false + go func() { + for f() != expect && !giveup { + time.Sleep(time.Millisecond) + } + close(ok) + }() + select { + case <-ok: + case <-time.After(timeout): + giveup = true + _, file, line, _ := runtime.Caller(1) + t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line) + } +} + +func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { + if l := b.Status().Queued; l != expectQueued { + t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) + } +} + +func TestWorkQueueDoneness(t *testing.T) { + b := NewWorkQueue() + defer b.Close() + b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3})) + expectQueued(t, b, 3) + gate := make(chan struct{}) + go func() { + <-gate + for range b.NextItem { + <-gate + time.Sleep(time.Millisecond) + b.DoneItem <- struct{}{} + } + }() + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6})) + for i := 1; i <= 3; i++ { + gate <- struct{}{} + expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + } + close(gate) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + expectChannelEmpty(t, b.NextItem) +} + // Create a WorkQueue, generate a list for it, and instantiate a worker. func TestWorkQueueReadWrite(t *testing.T) { - var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} + var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() + expectQueued(t, b, 0) + b.ReplaceQueue(makeTestWorkList(input)) + expectQueued(t, b, len(input)) - expectFromChannel(t, b.NextItem, input) + doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) b.Close() } // Start a worker before the list has any input. func TestWorkQueueEarlyRead(t *testing.T) { - var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} + var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() + defer b.Close() // First, demonstrate that nothing is available on the NextItem // channel. @@ -76,8 +151,7 @@ func TestWorkQueueEarlyRead(t *testing.T) { // done := make(chan int) go func() { - expectFromChannel(t, b.NextItem, input) - b.Close() + doWorkItems(t, b, input) done <- 1 }() @@ -85,8 +159,29 @@ func TestWorkQueueEarlyRead(t *testing.T) { // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done + expectQueued(t, b, 0) +} - expectChannelClosed(t, b.NextItem) +// After Close(), NextItem closes, work finishes, then stats return zero. +func TestWorkQueueClose(t *testing.T) { + b := NewWorkQueue() + input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8} + mark := make(chan struct{}) + go func() { + <-b.NextItem + mark <- struct{}{} + <-mark + b.DoneItem <- struct{}{} + }() + b.ReplaceQueue(makeTestWorkList(input)) + // Wait for worker to take item 1 + <-mark + b.Close() + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + // Tell worker to report done + mark <- struct{}{} + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + expectChannelClosedWithin(t, time.Second, b.NextItem) } // Show that a reader may block when the manager's list is exhausted, @@ -94,15 +189,16 @@ func TestWorkQueueEarlyRead(t *testing.T) { // available. func TestWorkQueueReaderBlocks(t *testing.T) { var ( - inputBeforeBlock = []int{1, 2, 3, 4, 5} - inputAfterBlock = []int{6, 7, 8, 9, 10} + inputBeforeBlock = []interface{}{1, 2, 3, 4, 5} + inputAfterBlock = []interface{}{6, 7, 8, 9, 10} ) b := NewWorkQueue() + defer b.Close() sendmore := make(chan int) done := make(chan int) go func() { - expectFromChannel(t, b.NextItem, inputBeforeBlock) + doWorkItems(t, b, inputBeforeBlock) // Confirm that the channel is empty, so a subsequent read // on it will block. @@ -110,8 +206,7 @@ func TestWorkQueueReaderBlocks(t *testing.T) { // Signal that we're ready for more input. sendmore <- 1 - expectFromChannel(t, b.NextItem, inputAfterBlock) - b.Close() + doWorkItems(t, b, inputAfterBlock) done <- 1 }() @@ -128,22 +223,22 @@ func TestWorkQueueReaderBlocks(t *testing.T) { // Replace one active work list with another. func TestWorkQueueReplaceQueue(t *testing.T) { - var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} - var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81} + var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} + var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81} b := NewWorkQueue() b.ReplaceQueue(makeTestWorkList(firstInput)) // Read just the first five elements from the work list. // Confirm that the channel is not empty. - expectFromChannel(t, b.NextItem, firstInput[0:5]) + doWorkItems(t, b, firstInput[0:5]) expectChannelNotEmpty(t, b.NextItem) // Replace the work list and read five more elements. // The old list should have been discarded and all new // elements come from the new list. b.ReplaceQueue(makeTestWorkList(replaceInput)) - expectFromChannel(t, b.NextItem, replaceInput[0:5]) + doWorkItems(t, b, replaceInput[0:5]) b.Close() }