projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Make sure to bump the dependency on the arvados gems to the 1.2 series.
[arvados.git]
/
services
/
keepstore
/
work_queue_test.go
diff --git
a/services/keepstore/work_queue_test.go
b/services/keepstore/work_queue_test.go
index 7844a2b87cde29546239f505dc5cf949781f87ab..8a26c090c9dd5cedc690f34a9f879f5eb842a40e 100644
(file)
--- a/
services/keepstore/work_queue_test.go
+++ b/
services/keepstore/work_queue_test.go
@@
-1,3
+1,7
@@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
package main
import (
@@
-11,7
+15,7
@@
type fatalfer interface {
Fatalf(string, ...interface{})
}
Fatalf(string, ...interface{})
}
-func makeTestWorkList(ary []int) *list.List {
+func makeTestWorkList(ary []int
erface{}
) *list.List {
l := list.New()
for _, n := range ary {
l.PushBack(n)
l := list.New()
for _, n := range ary {
l.PushBack(n)
@@
-53,13
+57,13
@@
func expectChannelClosedWithin(t fatalfer, timeout time.Duration, c <-chan inter
}
}
}
}
-func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []int
erface{}
) {
for i := range expected {
actual, ok := <-q.NextItem
if !ok {
t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
}
for i := range expected {
actual, ok := <-q.NextItem
if !ok {
t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
}
- q.
ReportDone
<- struct{}{}
+ q.
DoneItem
<- struct{}{}
if actual.(int) != expected[i] {
t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
}
if actual.(int) != expected[i] {
t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
}
@@
-93,19
+97,19
@@
func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
func TestWorkQueueDoneness(t *testing.T) {
b := NewWorkQueue()
defer b.Close()
func TestWorkQueueDoneness(t *testing.T) {
b := NewWorkQueue()
defer b.Close()
- b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
+ b.ReplaceQueue(makeTestWorkList([]int
erface{}
{1, 2, 3}))
expectQueued(t, b, 3)
gate := make(chan struct{})
go func() {
<-gate
expectQueued(t, b, 3)
gate := make(chan struct{})
go func() {
<-gate
- for
_ =
range b.NextItem {
+ for range b.NextItem {
<-gate
time.Sleep(time.Millisecond)
<-gate
time.Sleep(time.Millisecond)
- b.
ReportDone
<- struct{}{}
+ b.
DoneItem
<- struct{}{}
}
}()
expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
}
}()
expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
- b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6}))
+ b.ReplaceQueue(makeTestWorkList([]int
erface{}
{4, 5, 6}))
for i := 1; i <= 3; i++ {
gate <- struct{}{}
expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
for i := 1; i <= 3; i++ {
gate <- struct{}{}
expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
@@
-118,7
+122,7
@@
func TestWorkQueueDoneness(t *testing.T) {
// Create a WorkQueue, generate a list for it, and instantiate a worker.
func TestWorkQueueReadWrite(t *testing.T) {
// Create a WorkQueue, generate a list for it, and instantiate a worker.
func TestWorkQueueReadWrite(t *testing.T) {
- var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var input = []int
erface{}
{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
expectQueued(t, b, 0)
b := NewWorkQueue()
expectQueued(t, b, 0)
@@
-133,7
+137,7
@@
func TestWorkQueueReadWrite(t *testing.T) {
// Start a worker before the list has any input.
func TestWorkQueueEarlyRead(t *testing.T) {
// Start a worker before the list has any input.
func TestWorkQueueEarlyRead(t *testing.T) {
- var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var input = []int
erface{}
{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
defer b.Close()
b := NewWorkQueue()
defer b.Close()
@@
-161,13
+165,13
@@
func TestWorkQueueEarlyRead(t *testing.T) {
// After Close(), NextItem closes, work finishes, then stats return zero.
func TestWorkQueueClose(t *testing.T) {
b := NewWorkQueue()
// After Close(), NextItem closes, work finishes, then stats return zero.
func TestWorkQueueClose(t *testing.T) {
b := NewWorkQueue()
- input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+ input := []int
erface{}
{1, 2, 3, 4, 5, 6, 7, 8}
mark := make(chan struct{})
go func() {
<-b.NextItem
mark <- struct{}{}
<-mark
mark := make(chan struct{})
go func() {
<-b.NextItem
mark <- struct{}{}
<-mark
- b.
ReportDone
<- struct{}{}
+ b.
DoneItem
<- struct{}{}
}()
b.ReplaceQueue(makeTestWorkList(input))
// Wait for worker to take item 1
}()
b.ReplaceQueue(makeTestWorkList(input))
// Wait for worker to take item 1
@@
-185,8
+189,8
@@
func TestWorkQueueClose(t *testing.T) {
// available.
func TestWorkQueueReaderBlocks(t *testing.T) {
var (
// available.
func TestWorkQueueReaderBlocks(t *testing.T) {
var (
- inputBeforeBlock = []int{1, 2, 3, 4, 5}
- inputAfterBlock = []int{6, 7, 8, 9, 10}
+ inputBeforeBlock = []int
erface{}
{1, 2, 3, 4, 5}
+ inputAfterBlock = []int
erface{}
{6, 7, 8, 9, 10}
)
b := NewWorkQueue()
)
b := NewWorkQueue()
@@
-219,8
+223,8
@@
func TestWorkQueueReaderBlocks(t *testing.T) {
// Replace one active work list with another.
func TestWorkQueueReplaceQueue(t *testing.T) {
// Replace one active work list with another.
func TestWorkQueueReplaceQueue(t *testing.T) {
- var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
- var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+ var firstInput = []int
erface{}
{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var replaceInput = []int
erface{}
{1, 4, 9, 16, 25, 36, 49, 64, 81}
b := NewWorkQueue()
b.ReplaceQueue(makeTestWorkList(firstInput))
b := NewWorkQueue()
b.ReplaceQueue(makeTestWorkList(firstInput))