+func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
+ ok := make(chan struct{})
+ giveup := false
+ go func() {
+ for f() != expect && !giveup {
+ time.Sleep(time.Millisecond)
+ }
+ close(ok)
+ }()
+ select {
+ case <-ok:
+ case <-time.After(timeout):
+ giveup = true
+ _, file, line, _ := runtime.Caller(1)
+ t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
+ }
+}
+
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+ if l := b.Status().Queued; l != expectQueued {
+ t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
+ }
+}
+
+func TestWorkQueueDoneness(t *testing.T) {
+ b := NewWorkQueue()
+ defer b.Close()
+ b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
+ expectQueued(t, b, 3)
+ gate := make(chan struct{})
+ go func() {
+ <-gate
+ for range b.NextItem {
+ <-gate
+ time.Sleep(time.Millisecond)
+ b.DoneItem <- struct{}{}
+ }
+ }()
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
+ for i := 1; i <= 3; i++ {
+ gate <- struct{}{}
+ expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+ }
+ close(gate)
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ expectChannelEmpty(t, b.NextItem)
+}
+