X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4de0af809ffbef43d89cd1751e5d611a4b5445e9..75e00445b6de230493e9ee37483dd4c469db29b1:/services/keepstore/work_queue_test.go diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go index df0fa9c612..8893fb9406 100644 --- a/services/keepstore/work_queue_test.go +++ b/services/keepstore/work_queue_test.go @@ -11,7 +11,7 @@ type fatalfer interface { Fatalf(string, ...interface{}) } -func makeTestWorkList(ary []int) *list.List { +func makeTestWorkList(ary []interface{}) *list.List { l := list.New() for _, n := range ary { l.PushBack(n) @@ -53,13 +53,13 @@ func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan inter } } -func doWorkItems(t fatalfer, q *WorkQueue, expected []int) { +func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) { for i := range expected { actual, ok := <-q.NextItem if !ok { t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i]) } - q.ReportDone <- struct{}{} + q.DoneItem <- struct{}{} if actual.(int) != expected[i] { t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i]) } @@ -84,40 +84,47 @@ func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f } } -func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) { - if l := b.CountQueued(); l != expectCountQueued { - t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued) +func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { + if l := b.Status().Queued; l != expectQueued { + t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) } } func TestWorkQueueDoneness(t *testing.T) { b := NewWorkQueue() defer b.Close() - b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3})) - expectCountQueued(t, b, 3) + b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3})) + expectQueued(t, b, 3) + gate := make(chan struct{}) go func() { - for _ = range b.NextItem { - //time.Sleep(time.Duration(delay.(int)) * time.Millisecond) + <-gate + for range b.NextItem { + <-gate time.Sleep(time.Millisecond) - b.ReportDone <- struct{}{} + b.DoneItem <- struct{}{} } }() - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) - b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6})) - expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() }) - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6})) + for i := 1; i <= 3; i++ { + gate <- struct{}{} + expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + } + close(gate) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelEmpty(t, b.NextItem) } // Create a WorkQueue, generate a list for it, and instantiate a worker. func TestWorkQueueReadWrite(t *testing.T) { - var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} + var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) b.ReplaceQueue(makeTestWorkList(input)) - expectCountQueued(t, b, len(input)) + expectQueued(t, b, len(input)) doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) @@ -126,7 +133,7 @@ func TestWorkQueueReadWrite(t *testing.T) { // Start a worker before the list has any input. func TestWorkQueueEarlyRead(t *testing.T) { - var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} + var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() defer b.Close() @@ -148,28 +155,28 @@ func TestWorkQueueEarlyRead(t *testing.T) { // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) } // After Close(), NextItem closes, work finishes, then stats return zero. func TestWorkQueueClose(t *testing.T) { b := NewWorkQueue() - input := []int{1, 2, 3, 4, 5, 6, 7, 8} + input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8} mark := make(chan struct{}) go func() { <-b.NextItem mark <- struct{}{} <-mark - b.ReportDone <- struct{}{} + b.DoneItem <- struct{}{} }() b.ReplaceQueue(makeTestWorkList(input)) // Wait for worker to take item 1 <-mark b.Close() - expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) // Tell worker to report done mark <- struct{}{} - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelClosedWithin(t, time.Second, b.NextItem) } @@ -178,8 +185,8 @@ func TestWorkQueueClose(t *testing.T) { // available. func TestWorkQueueReaderBlocks(t *testing.T) { var ( - inputBeforeBlock = []int{1, 2, 3, 4, 5} - inputAfterBlock = []int{6, 7, 8, 9, 10} + inputBeforeBlock = []interface{}{1, 2, 3, 4, 5} + inputAfterBlock = []interface{}{6, 7, 8, 9, 10} ) b := NewWorkQueue() @@ -212,8 +219,8 @@ func TestWorkQueueReaderBlocks(t *testing.T) { // Replace one active work list with another. func TestWorkQueueReplaceQueue(t *testing.T) { - var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} - var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81} + var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34} + var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81} b := NewWorkQueue() b.ReplaceQueue(makeTestWorkList(firstInput))