10 type fatalfer interface {
11 Fatalf(string, ...interface{})
14 func makeTestWorkList(ary []int) *list.List {
16 for _, n := range ary {
22 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
26 t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
32 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
36 t.Fatalf("expected data on a closed channel")
39 case <-time.After(time.Second):
40 t.Fatalf("expected data on an empty channel")
45 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
47 case received, ok := <-c:
49 t.Fatalf("Expected channel to be closed, but received %+v instead", received)
51 case <-time.After(timeout):
52 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
56 func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
57 for i := range expected {
58 actual, ok := <-q.NextItem
60 t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
62 q.ReportDone <- struct{}{}
63 if actual.(int) != expected[i] {
64 t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
69 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
70 ok := make(chan struct{})
73 for f() != expect && !giveup {
74 time.Sleep(time.Millisecond)
80 case <-time.After(timeout):
82 _, file, line, _ := runtime.Caller(1)
83 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
87 func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
88 if l := b.CountQueued(); l != expectCountQueued {
89 t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
93 func TestWorkQueueDoneness(t *testing.T) {
96 b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
97 expectCountQueued(t, b, 3)
99 for _ = range b.NextItem {
100 //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
101 time.Sleep(time.Millisecond)
102 b.ReportDone <- struct{}{}
105 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
106 b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
107 expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
108 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
109 expectChannelEmpty(t, b.NextItem)
112 // Create a WorkQueue, generate a list for it, and instantiate a worker.
113 func TestWorkQueueReadWrite(t *testing.T) {
114 var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
117 expectCountQueued(t, b, 0)
119 b.ReplaceQueue(makeTestWorkList(input))
120 expectCountQueued(t, b, len(input))
122 doWorkItems(t, b, input)
123 expectChannelEmpty(t, b.NextItem)
127 // Start a worker before the list has any input.
128 func TestWorkQueueEarlyRead(t *testing.T) {
129 var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
134 // First, demonstrate that nothing is available on the NextItem
136 expectChannelEmpty(t, b.NextItem)
138 // Start a reader in a goroutine. The reader will block until the
139 // block work list has been initialized.
141 done := make(chan int)
143 doWorkItems(t, b, input)
147 // Feed the blocklist a new worklist, and wait for the worker to
149 b.ReplaceQueue(makeTestWorkList(input))
151 expectCountQueued(t, b, 0)
154 // After Close(), NextItem closes, work finishes, then stats return zero.
155 func TestWorkQueueClose(t *testing.T) {
157 input := []int{1, 2, 3, 4, 5, 6, 7, 8}
158 mark := make(chan struct{})
163 b.ReportDone <- struct{}{}
165 b.ReplaceQueue(makeTestWorkList(input))
166 // Wait for worker to take item 1
169 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
170 // Tell worker to report done
172 expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
173 expectChannelClosedWithin(t, time.Second, b.NextItem)
176 // Show that a reader may block when the manager's list is exhausted,
177 // and that the reader resumes automatically when new data is
179 func TestWorkQueueReaderBlocks(t *testing.T) {
181 inputBeforeBlock = []int{1, 2, 3, 4, 5}
182 inputAfterBlock = []int{6, 7, 8, 9, 10}
187 sendmore := make(chan int)
188 done := make(chan int)
190 doWorkItems(t, b, inputBeforeBlock)
192 // Confirm that the channel is empty, so a subsequent read
194 expectChannelEmpty(t, b.NextItem)
196 // Signal that we're ready for more input.
198 doWorkItems(t, b, inputAfterBlock)
202 // Write a slice of the first five elements and wait for the
203 // reader to signal that it's ready for us to send more input.
204 b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
207 b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
209 // Wait for the reader to complete.
213 // Replace one active work list with another.
214 func TestWorkQueueReplaceQueue(t *testing.T) {
215 var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
216 var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
219 b.ReplaceQueue(makeTestWorkList(firstInput))
221 // Read just the first five elements from the work list.
222 // Confirm that the channel is not empty.
223 doWorkItems(t, b, firstInput[0:5])
224 expectChannelNotEmpty(t, b.NextItem)
226 // Replace the work list and read five more elements.
227 // The old list should have been discarded and all new
228 // elements come from the new list.
229 b.ReplaceQueue(makeTestWorkList(replaceInput))
230 doWorkItems(t, b, replaceInput[0:5])