-package main
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepstore
import (
"container/list"
Fatalf(string, ...interface{})
}
-func makeTestWorkList(ary []int) *list.List {
+func makeTestWorkList(ary []interface{}) *list.List {
l := list.New()
for _, n := range ary {
l.PushBack(n)
}
}
-func doWorkItems(t fatalfer, q *WorkQueue, expected []int) {
+func doWorkItems(t fatalfer, q *WorkQueue, expected []interface{}) {
for i := range expected {
actual, ok := <-q.NextItem
if !ok {
t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i])
}
- q.ReportDone <- struct{}{}
+ q.DoneItem <- struct{}{}
if actual.(int) != expected[i] {
t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i])
}
}
}
-func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) {
- if l := b.CountQueued(); l != expectCountQueued {
- t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued)
+func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) {
+ if l := b.Status().Queued; l != expectQueued {
+ t.Fatalf("Got Queued==%d, expected %d", l, expectQueued)
}
}
func TestWorkQueueDoneness(t *testing.T) {
b := NewWorkQueue()
defer b.Close()
- b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3}))
- expectCountQueued(t, b, 3)
+ b.ReplaceQueue(makeTestWorkList([]interface{}{1, 2, 3}))
+ expectQueued(t, b, 3)
+ gate := make(chan struct{})
go func() {
- for _ = range b.NextItem {
- //time.Sleep(time.Duration(delay.(int)) * time.Millisecond)
+ <-gate
+ for range b.NextItem {
+ <-gate
time.Sleep(time.Millisecond)
- b.ReportDone <- struct{}{}
+ b.DoneItem <- struct{}{}
}
}()
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
- b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6}))
- expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() })
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
+ b.ReplaceQueue(makeTestWorkList([]interface{}{4, 5, 6}))
+ for i := 1; i <= 3; i++ {
+ gate <- struct{}{}
+ expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
+ }
+ close(gate)
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelEmpty(t, b.NextItem)
}
// Create a WorkQueue, generate a list for it, and instantiate a worker.
func TestWorkQueueReadWrite(t *testing.T) {
- var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
b.ReplaceQueue(makeTestWorkList(input))
- expectCountQueued(t, b, len(input))
+ expectQueued(t, b, len(input))
doWorkItems(t, b, input)
expectChannelEmpty(t, b.NextItem)
// Start a worker before the list has any input.
func TestWorkQueueEarlyRead(t *testing.T) {
- var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var input = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
b := NewWorkQueue()
defer b.Close()
// finish.
b.ReplaceQueue(makeTestWorkList(input))
<-done
- expectCountQueued(t, b, 0)
+ expectQueued(t, b, 0)
}
// After Close(), NextItem closes, work finishes, then stats return zero.
func TestWorkQueueClose(t *testing.T) {
b := NewWorkQueue()
- input := []int{1, 2, 3, 4, 5, 6, 7, 8}
+ input := []interface{}{1, 2, 3, 4, 5, 6, 7, 8}
mark := make(chan struct{})
go func() {
<-b.NextItem
mark <- struct{}{}
<-mark
- b.ReportDone <- struct{}{}
+ b.DoneItem <- struct{}{}
}()
b.ReplaceQueue(makeTestWorkList(input))
// Wait for worker to take item 1
<-mark
b.Close()
- expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress })
// Tell worker to report done
mark <- struct{}{}
- expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() })
+ expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress })
expectChannelClosedWithin(t, time.Second, b.NextItem)
}
// available.
func TestWorkQueueReaderBlocks(t *testing.T) {
var (
- inputBeforeBlock = []int{1, 2, 3, 4, 5}
- inputAfterBlock = []int{6, 7, 8, 9, 10}
+ inputBeforeBlock = []interface{}{1, 2, 3, 4, 5}
+ inputAfterBlock = []interface{}{6, 7, 8, 9, 10}
)
b := NewWorkQueue()
// Replace one active work list with another.
func TestWorkQueueReplaceQueue(t *testing.T) {
- var firstInput = []int{1, 1, 2, 3, 5, 8, 13, 21, 34}
- var replaceInput = []int{1, 4, 9, 16, 25, 36, 49, 64, 81}
+ var firstInput = []interface{}{1, 1, 2, 3, 5, 8, 13, 21, 34}
+ var replaceInput = []interface{}{1, 4, 9, 16, 25, 36, 49, 64, 81}
b := NewWorkQueue()
b.ReplaceQueue(makeTestWorkList(firstInput))