4015: fix pipeline instance tests
[arvados.git] / services / keepstore / work_queue_test.go
1 package main
2
3 import (
4         "container/list"
5         "testing"
6 )
7
8 func makeTestWorkList(ary []int) *list.List {
9         l := list.New()
10         for _, n := range ary {
11                 l.PushBack(n)
12         }
13         return l
14 }
15
16 func expectChannelEmpty(t *testing.T, c <-chan interface{}) {
17         select {
18         case item := <-c:
19                 t.Fatalf("Received value (%v) from channel that we expected to be empty", item)
20         default:
21                 // no-op
22         }
23 }
24
25 func expectChannelNotEmpty(t *testing.T, c <-chan interface{}) {
26         if item, ok := <-c; !ok {
27                 t.Fatal("expected data on a closed channel")
28         } else if item == nil {
29                 t.Fatal("expected data on an empty channel")
30         }
31 }
32
33 func expectChannelClosed(t *testing.T, c <-chan interface{}) {
34         received, ok := <-c
35         if ok {
36                 t.Fatalf("Expected channel to be closed, but received %v instead", received)
37         }
38 }
39
40 func expectFromChannel(t *testing.T, c <-chan interface{}, expected []int) {
41         for i := range expected {
42                 actual, ok := <-c
43                 t.Logf("received %v", actual)
44                 if !ok {
45                         t.Fatalf("Expected %v but channel was closed after receiving the first %d elements correctly.", expected, i)
46                 } else if actual.(int) != expected[i] {
47                         t.Fatalf("Expected %v but received '%v' after receiving the first %d elements correctly.", expected[i], actual, i)
48                 }
49         }
50 }
51
52 // Create a WorkQueue, generate a list for it, and instantiate a worker.
53 func TestWorkQueueReadWrite(t *testing.T) {
54         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
55
56         b := NewWorkQueue()
57         b.ReplaceQueue(makeTestWorkList(input))
58
59         expectFromChannel(t, b.NextItem, input)
60         expectChannelEmpty(t, b.NextItem)
61         b.Close()
62 }
63
64 // Start a worker before the list has any input.
65 func TestWorkQueueEarlyRead(t *testing.T) {
66         var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
67
68         b := NewWorkQueue()
69
70         // First, demonstrate that nothing is available on the NextItem
71         // channel.
72         expectChannelEmpty(t, b.NextItem)
73
74         // Start a reader in a goroutine. The reader will block until the
75         // block work list has been initialized.
76         //
77         done := make(chan int)
78         go func() {
79                 expectFromChannel(t, b.NextItem, input)
80                 b.Close()
81                 done <- 1
82         }()
83
84         // Feed the blocklist a new worklist, and wait for the worker to
85         // finish.
86         b.ReplaceQueue(makeTestWorkList(input))
87         <-done
88
89         expectChannelClosed(t, b.NextItem)
90 }
91
92 // Show that a reader may block when the manager's list is exhausted,
93 // and that the reader resumes automatically when new data is
94 // available.
95 func TestWorkQueueReaderBlocks(t *testing.T) {
96         var (
97                 inputBeforeBlock = []int{1, 2, 3, 4, 5}
98                 inputAfterBlock  = []int{6, 7, 8, 9, 10}
99         )
100
101         b := NewWorkQueue()
102         sendmore := make(chan int)
103         done := make(chan int)
104         go func() {
105                 expectFromChannel(t, b.NextItem, inputBeforeBlock)
106
107                 // Confirm that the channel is empty, so a subsequent read
108                 // on it will block.
109                 expectChannelEmpty(t, b.NextItem)
110
111                 // Signal that we're ready for more input.
112                 sendmore <- 1
113                 expectFromChannel(t, b.NextItem, inputAfterBlock)
114                 b.Close()
115                 done <- 1
116         }()
117
118         // Write a slice of the first five elements and wait for the
119         // reader to signal that it's ready for us to send more input.
120         b.ReplaceQueue(makeTestWorkList(inputBeforeBlock))
121         <-sendmore
122
123         b.ReplaceQueue(makeTestWorkList(inputAfterBlock))
124
125         // Wait for the reader to complete.
126         <-done
127 }
128
129 // Replace one active work list with another.
130 func TestWorkQueueReplaceQueue(t *testing.T) {
131         var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
132         var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
133
134         b := NewWorkQueue()
135         b.ReplaceQueue(makeTestWorkList(firstInput))
136
137         // Read just the first five elements from the work list.
138         // Confirm that the channel is not empty.
139         expectFromChannel(t, b.NextItem, firstInput[0:5])
140         expectChannelNotEmpty(t, b.NextItem)
141
142         // Replace the work list and read five more elements.
143         // The old list should have been discarded and all new
144         // elements come from the new list.
145         b.ReplaceQueue(makeTestWorkList(replaceInput))
146         expectFromChannel(t, b.NextItem, replaceInput[0:5])
147
148         b.Close()
149 }