6260: Fix races in keepstore tests. Expose WorkQueue in-progress/queued stats.
[arvados.git] / services / keepstore / work_queue_test.go
1 package main
2
3 import (
4         "container/list"
5         "runtime"
6         "testing"
7         "time"
8 )
9
10 type fatalfer interface {
11         Fatalf(string, ...interface{})
12 }
13
14 func makeTestWorkList(ary []int) *list.List {
15         l := list.New()
16         for _, n := range ary {
17                 l.PushBack(n)
18         }
19         return l
20 }
21
22 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
23         select {
24         case item, ok := <-c:
25                 if ok {
26                         t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
27                 }
28         default:
29         }
30 }
31
32 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
33         select {
34         case item, ok := <-c:
35                 if !ok {
36                         t.Fatalf("expected data on a closed channel")
37                 }
38                 return item
39         case <-time.After(time.Second):
40                 t.Fatalf("expected data on an empty channel")
41                 return nil
42         }
43 }
44
45 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
46         select {
47         case received, ok := <-c:
48                 if ok {
49                         t.Fatalf("Expected channel to be closed, but received %+v instead", received)
50                 }
51         case <-time.After(timeout):
52                 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
53         }
54 }
55
56 func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
57         for i := range expected {
58                 actual, ok := <-q.NextItem
59                 if !ok {
60                         t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
61                 }
62                 q.ReportDone <- struct{}{}
63                 if actual.(int) != expected[i] {
64                         t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
65                 }
66         }
67 }
68
69 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
70         ok := make(chan struct{})
71         giveup := false
72         go func() {
73                 for f() != expect && !giveup {
74                         time.Sleep(time.Millisecond)
75                 }
76                 close(ok)
77         }()
78         select {
79         case <-ok:
80         case <-time.After(timeout):
81                 giveup = true
82                 _, file, line, _ := runtime.Caller(1)
83                 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
84         }
85 }
86
87 func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
88         if l := b.CountQueued(); l != expectCountQueued {
89                 t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
90         }
91 }
92
93 func TestWorkQueueDoneness(t *testing.T) {
94         b := NewWorkQueue()
95         defer b.Close()
96         b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
97         expectCountQueued(t, b, 3)
98         go func() {
99                 for _ = range b.NextItem {
100                         //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
101                         time.Sleep(time.Millisecond)
102                         b.ReportDone <- struct{}{}
103                 }
104         }()
105         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
106         b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
107         expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
108         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
109         expectChannelEmpty(t, b.NextItem)
110 }
111
112 // Create a WorkQueue, generate a list for it, and instantiate a worker.
113 func TestWorkQueueReadWrite(t *testing.T) {
114         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
115
116         b := NewWorkQueue()
117         expectCountQueued(t, b, 0)
118
119         b.ReplaceQueue(makeTestWorkList(input))
120         expectCountQueued(t, b, len(input))
121
122         doWorkItems(t, b, input)
123         expectChannelEmpty(t, b.NextItem)
124         b.Close()
125 }
126
127 // Start a worker before the list has any input.
128 func TestWorkQueueEarlyRead(t *testing.T) {
129         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
130
131         b := NewWorkQueue()
132         defer b.Close()
133
134         // First, demonstrate that nothing is available on the NextItem
135         // channel.
136         expectChannelEmpty(t, b.NextItem)
137
138         // Start a reader in a goroutine. The reader will block until the
139         // block work list has been initialized.
140         //
141         done := make(chan int)
142         go func() {
143                 doWorkItems(t, b, input)
144                 done <- 1
145         }()
146
147         // Feed the blocklist a new worklist, and wait for the worker to
148         // finish.
149         b.ReplaceQueue(makeTestWorkList(input))
150         <-done
151         expectCountQueued(t, b, 0)
152 }
153
154 // After Close(), NextItem closes, work finishes, then stats return zero.
155 func TestWorkQueueClose(t *testing.T) {
156         b := NewWorkQueue()
157         input := []int{1, 2, 3, 4, 5, 6, 7, 8}
158         mark := make(chan struct{})
159         go func() {
160                 <-b.NextItem
161                 mark <- struct{}{}
162                 <-mark
163                 b.ReportDone <- struct{}{}
164         }()
165         b.ReplaceQueue(makeTestWorkList(input))
166         // Wait for worker to take item 1
167         <-mark
168         b.Close()
169         expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
170         // Tell worker to report done
171         mark <- struct{}{}
172         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
173         expectChannelClosedWithin(t, time.Second, b.NextItem)
174 }
175
176 // Show that a reader may block when the manager's list is exhausted,
177 // and that the reader resumes automatically when new data is
178 // available.
179 func TestWorkQueueReaderBlocks(t *testing.T) {
180         var (
181                 inputBeforeBlock = []int{1, 2, 3, 4, 5}
182                 inputAfterBlock  = []int{6, 7, 8, 9, 10}
183         )
184
185         b := NewWorkQueue()
186         defer b.Close()
187         sendmore := make(chan int)
188         done := make(chan int)
189         go func() {
190                 doWorkItems(t, b, inputBeforeBlock)
191
192                 // Confirm that the channel is empty, so a subsequent read
193                 // on it will block.
194                 expectChannelEmpty(t, b.NextItem)
195
196                 // Signal that we're ready for more input.
197                 sendmore <- 1
198                 doWorkItems(t, b, inputAfterBlock)
199                 done <- 1
200         }()
201
202         // Write a slice of the first five elements and wait for the
203         // reader to signal that it's ready for us to send more input.
204         b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
205         <-sendmore
206
207         b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
208
209         // Wait for the reader to complete.
210         <-done
211 }
212
213 // Replace one active work list with another.
214 func TestWorkQueueReplaceQueue(t *testing.T) {
215         var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
216         var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
217
218         b := NewWorkQueue()
219         b.ReplaceQueue(makeTestWorkList(firstInput))
220
221         // Read just the first five elements from the work list.
222         // Confirm that the channel is not empty.
223         doWorkItems(t, b, firstInput[0:5])
224         expectChannelNotEmpty(t, b.NextItem)
225
226         // Replace the work list and read five more elements.
227         // The old list should have been discarded and all new
228         // elements come from the new list.
229         b.ReplaceQueue(makeTestWorkList(replaceInput))
230         doWorkItems(t, b, replaceInput[0:5])
231
232         b.Close()
233 }