Merge branch '11065-rotate-logs'
[arvados.git] / services / keepstore / work_queue_test.go
1 package main
2
3 import (
4         "container/list"
5         "runtime"
6         "testing"
7         "time"
8 )
9
10 type fatalfer interface {
11         Fatalf(string, ...interface{})
12 }
13
14 func makeTestWorkList(ary []int) *list.List {
15         l := list.New()
16         for _, n := range ary {
17                 l.PushBack(n)
18         }
19         return l
20 }
21
22 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
23         select {
24         case item, ok := <-c:
25                 if ok {
26                         t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
27                 }
28         default:
29         }
30 }
31
32 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
33         select {
34         case item, ok := <-c:
35                 if !ok {
36                         t.Fatalf("expected data on a closed channel")
37                 }
38                 return item
39         case <-time.After(time.Second):
40                 t.Fatalf("expected data on an empty channel")
41                 return nil
42         }
43 }
44
45 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
46         select {
47         case received, ok := <-c:
48                 if ok {
49                         t.Fatalf("Expected channel to be closed, but received %+v instead", received)
50                 }
51         case <-time.After(timeout):
52                 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
53         }
54 }
55
56 func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
57         for i := range expected {
58                 actual, ok := <-q.NextItem
59                 if !ok {
60                         t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
61                 }
62                 q.DoneItem <- struct{}{}
63                 if actual.(int) != expected[i] {
64                         t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
65                 }
66         }
67 }
68
69 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
70         ok := make(chan struct{})
71         giveup := false
72         go func() {
73                 for f() != expect && !giveup {
74                         time.Sleep(time.Millisecond)
75                 }
76                 close(ok)
77         }()
78         select {
79         case <-ok:
80         case <-time.After(timeout):
81                 giveup = true
82                 _, file, line, _ := runtime.Caller(1)
83                 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
84         }
85 }
86
87 func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
88         if l := b.Status().Queued; l != expectQueued {
89                 t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
90         }
91 }
92
93 func TestWorkQueueDoneness(t *testing.T) {
94         b := NewWorkQueue()
95         defer b.Close()
96         b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
97         expectQueued(t, b, 3)
98         gate := make(chan struct{})
99         go func() {
100                 <-gate
101                 for range b.NextItem {
102                         <-gate
103                         time.Sleep(time.Millisecond)
104                         b.DoneItem <- struct{}{}
105                 }
106         }()
107         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
108         b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
109         for i := 1; i <= 3; i++ {
110                 gate <- struct{}{}
111                 expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
112                 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
113         }
114         close(gate)
115         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
116         expectChannelEmpty(t, b.NextItem)
117 }
118
119 // Create a WorkQueue, generate a list for it, and instantiate a worker.
120 func TestWorkQueueReadWrite(t *testing.T) {
121         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
122
123         b := NewWorkQueue()
124         expectQueued(t, b, 0)
125
126         b.ReplaceQueue(makeTestWorkList(input))
127         expectQueued(t, b, len(input))
128
129         doWorkItems(t, b, input)
130         expectChannelEmpty(t, b.NextItem)
131         b.Close()
132 }
133
134 // Start a worker before the list has any input.
135 func TestWorkQueueEarlyRead(t *testing.T) {
136         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
137
138         b := NewWorkQueue()
139         defer b.Close()
140
141         // First, demonstrate that nothing is available on the NextItem
142         // channel.
143         expectChannelEmpty(t, b.NextItem)
144
145         // Start a reader in a goroutine. The reader will block until the
146         // block work list has been initialized.
147         //
148         done := make(chan int)
149         go func() {
150                 doWorkItems(t, b, input)
151                 done <- 1
152         }()
153
154         // Feed the blocklist a new worklist, and wait for the worker to
155         // finish.
156         b.ReplaceQueue(makeTestWorkList(input))
157         <-done
158         expectQueued(t, b, 0)
159 }
160
161 // After Close(), NextItem closes, work finishes, then stats return zero.
162 func TestWorkQueueClose(t *testing.T) {
163         b := NewWorkQueue()
164         input := []int{1, 2, 3, 4, 5, 6, 7, 8}
165         mark := make(chan struct{})
166         go func() {
167                 <-b.NextItem
168                 mark <- struct{}{}
169                 <-mark
170                 b.DoneItem <- struct{}{}
171         }()
172         b.ReplaceQueue(makeTestWorkList(input))
173         // Wait for worker to take item 1
174         <-mark
175         b.Close()
176         expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
177         // Tell worker to report done
178         mark <- struct{}{}
179         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
180         expectChannelClosedWithin(t, time.Second, b.NextItem)
181 }
182
183 // Show that a reader may block when the manager's list is exhausted,
184 // and that the reader resumes automatically when new data is
185 // available.
186 func TestWorkQueueReaderBlocks(t *testing.T) {
187         var (
188                 inputBeforeBlock = []int{1, 2, 3, 4, 5}
189                 inputAfterBlock  = []int{6, 7, 8, 9, 10}
190         )
191
192         b := NewWorkQueue()
193         defer b.Close()
194         sendmore := make(chan int)
195         done := make(chan int)
196         go func() {
197                 doWorkItems(t, b, inputBeforeBlock)
198
199                 // Confirm that the channel is empty, so a subsequent read
200                 // on it will block.
201                 expectChannelEmpty(t, b.NextItem)
202
203                 // Signal that we're ready for more input.
204                 sendmore <- 1
205                 doWorkItems(t, b, inputAfterBlock)
206                 done <- 1
207         }()
208
209         // Write a slice of the first five elements and wait for the
210         // reader to signal that it's ready for us to send more input.
211         b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
212         <-sendmore
213
214         b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
215
216         // Wait for the reader to complete.
217         <-done
218 }
219
220 // Replace one active work list with another.
221 func TestWorkQueueReplaceQueue(t *testing.T) {
222         var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
223         var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
224
225         b := NewWorkQueue()
226         b.ReplaceQueue(makeTestWorkList(firstInput))
227
228         // Read just the first five elements from the work list.
229         // Confirm that the channel is not empty.
230         doWorkItems(t, b, firstInput[0:5])
231         expectChannelNotEmpty(t, b.NextItem)
232
233         // Replace the work list and read five more elements.
234         // The old list should have been discarded and all new
235         // elements come from the new list.
236         b.ReplaceQueue(makeTestWorkList(replaceInput))
237         doWorkItems(t, b, replaceInput[0:5])
238
239         b.Close()
240 }