1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
14 type fatalfer interface {
15 Fatalf(string, ...interface{})
18 func makeTestWorkList(ary []interface{}) *list.List {
20 for _, n := range ary {
26 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
30 t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
36 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
40 t.Fatalf("expected data on a closed channel")
43 case <-time.After(time.Second):
44 t.Fatalf("expected data on an empty channel")
49 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
51 case received, ok := <-c:
53 t.Fatalf("Expected channel to be closed, but received %+v instead", received)
55 case <-time.After(timeout):
56 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
60 func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
61 for i := range expected {
62 actual, ok := <-q.NextItem
64 t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
66 q.DoneItem <- struct{}{}
67 if actual.(int) != expected[i] {
68 t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
73 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
74 ok := make(chan struct{})
77 for f() != expect && !giveup {
78 time.Sleep(time.Millisecond)
84 case <-time.After(timeout):
86 _, file, line, _ := runtime.Caller(1)
87 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
91 func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
92 if l := b.Status().Queued; l != expectQueued {
93 t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
97 func TestWorkQueueDoneness(t *testing.T) {
100 b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
101 expectQueued(t, b, 3)
102 gate := make(chan struct{})
105 for range b.NextItem {
107 time.Sleep(time.Millisecond)
108 b.DoneItem <- struct{}{}
111 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
112 b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
113 for i := 1; i <= 3; i++ {
115 expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
116 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
119 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
120 expectChannelEmpty(t, b.NextItem)
123 // Create a WorkQueue, generate a list for it, and instantiate a worker.
124 func TestWorkQueueReadWrite(t *testing.T) {
125 var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
128 expectQueued(t, b, 0)
130 b.ReplaceQueue(makeTestWorkList(input))
131 expectQueued(t, b, len(input))
133 doWorkItems(t, b, input)
134 expectChannelEmpty(t, b.NextItem)
138 // Start a worker before the list has any input.
139 func TestWorkQueueEarlyRead(t *testing.T) {
140 var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
145 // First, demonstrate that nothing is available on the NextItem
147 expectChannelEmpty(t, b.NextItem)
149 // Start a reader in a goroutine. The reader will block until the
150 // block work list has been initialized.
152 done := make(chan int)
154 doWorkItems(t, b, input)
158 // Feed the blocklist a new worklist, and wait for the worker to
160 b.ReplaceQueue(makeTestWorkList(input))
162 expectQueued(t, b, 0)
165 // After Close(), NextItem closes, work finishes, then stats return zero.
166 func TestWorkQueueClose(t *testing.T) {
168 input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
169 mark := make(chan struct{})
174 b.DoneItem <- struct{}{}
176 b.ReplaceQueue(makeTestWorkList(input))
177 // Wait for worker to take item 1
180 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
181 // Tell worker to report done
183 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
184 expectChannelClosedWithin(t, time.Second, b.NextItem)
187 // Show that a reader may block when the manager's list is exhausted,
188 // and that the reader resumes automatically when new data is
190 func TestWorkQueueReaderBlocks(t *testing.T) {
192 inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
193 inputAfterBlock = []interface{}{6, 7, 8, 9, 10}
198 sendmore := make(chan int)
199 done := make(chan int)
201 doWorkItems(t, b, inputBeforeBlock)
203 // Confirm that the channel is empty, so a subsequent read
205 expectChannelEmpty(t, b.NextItem)
207 // Signal that we're ready for more input.
209 doWorkItems(t, b, inputAfterBlock)
213 // Write a slice of the first five elements and wait for the
214 // reader to signal that it's ready for us to send more input.
215 b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
218 b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
220 // Wait for the reader to complete.
224 // Replace one active work list with another.
225 func TestWorkQueueReplaceQueue(t *testing.T) {
226 var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
227 var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
230 b.ReplaceQueue(makeTestWorkList(firstInput))
232 // Read just the first five elements from the work list.
233 // Confirm that the channel is not empty.
234 doWorkItems(t, b, firstInput[0:5])
235 expectChannelNotEmpty(t, b.NextItem)
237 // Replace the work list and read five more elements.
238 // The old list should have been discarded and all new
239 // elements come from the new list.
240 b.ReplaceQueue(makeTestWorkList(replaceInput))
241 doWorkItems(t, b, replaceInput[0:5])