20318: Merge branch 'main' into 20318-disk-cache
[arvados.git] / services / keepstore / work_queue_test.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "container/list"
9         "runtime"
10         "testing"
11         "time"
12 )
13
14 type fatalfer interface {
15         Fatalf(string, ...interface{})
16 }
17
18 func makeTestWorkList(ary []interface{}) *list.List {
19         l := list.New()
20         for _, n := range ary {
21                 l.PushBack(n)
22         }
23         return l
24 }
25
26 func expectChannelEmpty(t fatalfer, c <-chan interface{}) {
27         select {
28         case item, ok := <-c:
29                 if ok {
30                         t.Fatalf("Received value (%+v) from channel that we expected to be empty", item)
31                 }
32         default:
33         }
34 }
35
36 func expectChannelNotEmpty(t fatalfer, c <-chan interface{}) interface{} {
37         select {
38         case item, ok := <-c:
39                 if !ok {
40                         t.Fatalf("expected data on a closed channel")
41                 }
42                 return item
43         case <-time.After(time.Second):
44                 t.Fatalf("expected data on an empty channel")
45                 return nil
46         }
47 }
48
49 func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan interface{}) {
50         select {
51         case received, ok := <-c:
52                 if ok {
53                         t.Fatalf("Expected channel to be closed, but received %+v instead", received)
54                 }
55         case <-time.After(timeout):
56                 t.Fatalf("Expected channel to be closed, but it is still open after %v", timeout)
57         }
58 }
59
60 func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
61         for i := range expected {
62                 actual, ok := <-q.NextItem
63                 if !ok {
64                         t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
65                 }
66                 q.DoneItem <- struct{}{}
67                 if actual.(int) != expected[i] {
68                         t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
69                 }
70         }
71 }
72
73 func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f func() interface{}) {
74         ok := make(chan struct{})
75         giveup := false
76         go func() {
77                 for f() != expect && !giveup {
78                         time.Sleep(time.Millisecond)
79                 }
80                 close(ok)
81         }()
82         select {
83         case <-ok:
84         case <-time.After(timeout):
85                 giveup = true
86                 _, file, line, _ := runtime.Caller(1)
87                 t.Fatalf("Still getting %+v, timed out waiting for %+v\n%s:%d", f(), expect, file, line)
88         }
89 }
90
91 func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
92         if l := b.Status().Queued; l != expectQueued {
93                 t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
94         }
95 }
96
97 func TestWorkQueueDoneness(t *testing.T) {
98         b := NewWorkQueue()
99         defer b.Close()
100         b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
101         expectQueued(t, b, 3)
102         gate := make(chan struct{})
103         go func() {
104                 <-gate
105                 for range b.NextItem {
106                         <-gate
107                         time.Sleep(time.Millisecond)
108                         b.DoneItem <- struct{}{}
109                 }
110         }()
111         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
112         b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
113         for i := 1; i <= 3; i++ {
114                 gate <- struct{}{}
115                 expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
116                 expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
117         }
118         close(gate)
119         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
120         expectChannelEmpty(t, b.NextItem)
121 }
122
123 // Create a WorkQueue, generate a list for it, and instantiate a worker.
124 func TestWorkQueueReadWrite(t *testing.T) {
125         var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
126
127         b := NewWorkQueue()
128         expectQueued(t, b, 0)
129
130         b.ReplaceQueue(makeTestWorkList(input))
131         expectQueued(t, b, len(input))
132
133         doWorkItems(t, b, input)
134         expectChannelEmpty(t, b.NextItem)
135         b.Close()
136 }
137
138 // Start a worker before the list has any input.
139 func TestWorkQueueEarlyRead(t *testing.T) {
140         var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
141
142         b := NewWorkQueue()
143         defer b.Close()
144
145         // First, demonstrate that nothing is available on the NextItem
146         // channel.
147         expectChannelEmpty(t, b.NextItem)
148
149         // Start a reader in a goroutine. The reader will block until the
150         // block work list has been initialized.
151         //
152         done := make(chan int)
153         go func() {
154                 doWorkItems(t, b, input)
155                 done <- 1
156         }()
157
158         // Feed the blocklist a new worklist, and wait for the worker to
159         // finish.
160         b.ReplaceQueue(makeTestWorkList(input))
161         <-done
162         expectQueued(t, b, 0)
163 }
164
165 // After Close(), NextItem closes, work finishes, then stats return zero.
166 func TestWorkQueueClose(t *testing.T) {
167         b := NewWorkQueue()
168         input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
169         mark := make(chan struct{})
170         go func() {
171                 <-b.NextItem
172                 mark <- struct{}{}
173                 <-mark
174                 b.DoneItem <- struct{}{}
175         }()
176         b.ReplaceQueue(makeTestWorkList(input))
177         // Wait for worker to take item 1
178         <-mark
179         b.Close()
180         expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
181         // Tell worker to report done
182         mark <- struct{}{}
183         expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
184         expectChannelClosedWithin(t, time.Second, b.NextItem)
185 }
186
187 // Show that a reader may block when the manager's list is exhausted,
188 // and that the reader resumes automatically when new data is
189 // available.
190 func TestWorkQueueReaderBlocks(t *testing.T) {
191         var (
192                 inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
193                 inputAfterBlock  = []interface{}{6, 7, 8, 9, 10}
194         )
195
196         b := NewWorkQueue()
197         defer b.Close()
198         sendmore := make(chan int)
199         done := make(chan int)
200         go func() {
201                 doWorkItems(t, b, inputBeforeBlock)
202
203                 // Confirm that the channel is empty, so a subsequent read
204                 // on it will block.
205                 expectChannelEmpty(t, b.NextItem)
206
207                 // Signal that we're ready for more input.
208                 sendmore <- 1
209                 doWorkItems(t, b, inputAfterBlock)
210                 done <- 1
211         }()
212
213         // Write a slice of the first five elements and wait for the
214         // reader to signal that it's ready for us to send more input.
215         b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
216         <-sendmore
217
218         b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
219
220         // Wait for the reader to complete.
221         <-done
222 }
223
224 // Replace one active work list with another.
225 func TestWorkQueueReplaceQueue(t *testing.T) {
226         var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
227         var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
228
229         b := NewWorkQueue()
230         b.ReplaceQueue(makeTestWorkList(firstInput))
231
232         // Read just the first five elements from the work list.
233         // Confirm that the channel is not empty.
234         doWorkItems(t, b, firstInput[0:5])
235         expectChannelNotEmpty(t, b.NextItem)
236
237         // Replace the work list and read five more elements.
238         // The old list should have been discarded and all new
239         // elements come from the new list.
240         b.ReplaceQueue(makeTestWorkList(replaceInput))
241         doWorkItems(t, b, replaceInput[0:5])
242
243         b.Close()
244 }