10 type fatalfer interface {
11 Fatalf(string, ...interface{})
14 func makeTestWorkList(ary []interface{}) *list.List {
16 for _, n := range ary {
22 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
26 t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
32 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
36 t.Fatalf("expected data on a closed channel")
39 case <-time.After(time.Second):
40 t.Fatalf("expected data on an empty channel")
45 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
47 case received, ok := <-c:
49 t.Fatalf("Expected channel to be closed, but received %+v instead", received)
51 case <-time.After(timeout):
52 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
56 func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
57 for i := range expected {
58 actual, ok := <-q.NextItem
60 t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
62 q.DoneItem <- struct{}{}
63 if actual.(int) != expected[i] {
64 t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
69 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
70 ok := make(chan struct{})
73 for f() != expect && !giveup {
74 time.Sleep(time.Millisecond)
80 case <-time.After(timeout):
82 _, file, line, _ := runtime.Caller(1)
83 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
87 func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
88 if l := b.Status().Queued; l != expectQueued {
89 t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
93 func TestWorkQueueDoneness(t *testing.T) {
96 b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
98 gate := make(chan struct{})
101 for range b.NextItem {
103 time.Sleep(time.Millisecond)
104 b.DoneItem <- struct{}{}
107 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
108 b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
109 for i := 1; i <= 3; i++ {
111 expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
112 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
115 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
116 expectChannelEmpty(t, b.NextItem)
119 // Create a WorkQueue, generate a list for it, and instantiate a worker.
120 func TestWorkQueueReadWrite(t *testing.T) {
121 var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
124 expectQueued(t, b, 0)
126 b.ReplaceQueue(makeTestWorkList(input))
127 expectQueued(t, b, len(input))
129 doWorkItems(t, b, input)
130 expectChannelEmpty(t, b.NextItem)
134 // Start a worker before the list has any input.
135 func TestWorkQueueEarlyRead(t *testing.T) {
136 var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
141 // First, demonstrate that nothing is available on the NextItem
143 expectChannelEmpty(t, b.NextItem)
145 // Start a reader in a goroutine. The reader will block until the
146 // block work list has been initialized.
148 done := make(chan int)
150 doWorkItems(t, b, input)
154 // Feed the blocklist a new worklist, and wait for the worker to
156 b.ReplaceQueue(makeTestWorkList(input))
158 expectQueued(t, b, 0)
161 // After Close(), NextItem closes, work finishes, then stats return zero.
162 func TestWorkQueueClose(t *testing.T) {
164 input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
165 mark := make(chan struct{})
170 b.DoneItem <- struct{}{}
172 b.ReplaceQueue(makeTestWorkList(input))
173 // Wait for worker to take item 1
176 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
177 // Tell worker to report done
179 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
180 expectChannelClosedWithin(t, time.Second, b.NextItem)
183 // Show that a reader may block when the manager's list is exhausted,
184 // and that the reader resumes automatically when new data is
186 func TestWorkQueueReaderBlocks(t *testing.T) {
188 inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
189 inputAfterBlock = []interface{}{6, 7, 8, 9, 10}
194 sendmore := make(chan int)
195 done := make(chan int)
197 doWorkItems(t, b, inputBeforeBlock)
199 // Confirm that the channel is empty, so a subsequent read
201 expectChannelEmpty(t, b.NextItem)
203 // Signal that we're ready for more input.
205 doWorkItems(t, b, inputAfterBlock)
209 // Write a slice of the first five elements and wait for the
210 // reader to signal that it's ready for us to send more input.
211 b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
214 b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
216 // Wait for the reader to complete.
220 // Replace one active work list with another.
221 func TestWorkQueueReplaceQueue(t *testing.T) {
222 var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
223 var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
226 b.ReplaceQueue(makeTestWorkList(firstInput))
228 // Read just the first five elements from the work list.
229 // Confirm that the channel is not empty.
230 doWorkItems(t, b, firstInput[0:5])
231 expectChannelNotEmpty(t, b.NextItem)
233 // Replace the work list and read five more elements.
234 // The old list should have been discarded and all new
235 // elements come from the new list.
236 b.ReplaceQueue(makeTestWorkList(replaceInput))
237 doWorkItems(t, b, replaceInput[0:5])