X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4de0af809ffbef43d89cd1751e5d611a4b5445e9..7a53cfc92d4bca452a687db0a6f338e6deb1564a:/services/keepstore/work_queue_test.go diff --git a/services/keepstore/work_queue_test.go b/services/keepstore/work_queue_test.go index df0fa9c612..6b31795293 100644 --- a/services/keepstore/work_queue_test.go +++ b/services/keepstore/work_queue_test.go @@ -59,7 +59,7 @@ func doWorkItems(t fatalfer, q *WorkQueue, expected []int) { if !ok { t.Fatalf("Expected %+v but channel was closed after receiving %+v as expected.", expected, expected[:i]) } - q.ReportDone <- struct{}{} + q.DoneItem <- struct{}{} if actual.(int) != expected[i] { t.Fatalf("Expected %+v but received %+v after receiving %+v as expected.", expected[i], actual, expected[:i]) } @@ -84,9 +84,9 @@ func expectEqualWithin(t fatalfer, timeout time.Duration, expect interface{}, f } } -func expectCountQueued(t fatalfer, b *WorkQueue, expectCountQueued int) { - if l := b.CountQueued(); l != expectCountQueued { - t.Fatalf("Got CountQueued()==%d, expected %d", l, expectCountQueued) +func expectQueued(t fatalfer, b *WorkQueue, expectQueued int) { + if l := b.Status().Queued; l != expectQueued { + t.Fatalf("Got Queued==%d, expected %d", l, expectQueued) } } @@ -94,18 +94,25 @@ func TestWorkQueueDoneness(t *testing.T) { b := NewWorkQueue() defer b.Close() b.ReplaceQueue(makeTestWorkList([]int{1, 2, 3})) - expectCountQueued(t, b, 3) + expectQueued(t, b, 3) + gate := make(chan struct{}) go func() { - for _ = range b.NextItem { - //time.Sleep(time.Duration(delay.(int)) * time.Millisecond) + <-gate + for range b.NextItem { + <-gate time.Sleep(time.Millisecond) - b.ReportDone <- struct{}{} + b.DoneItem <- struct{}{} } }() - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) - b.ReplaceQueue(makeTestWorkList([]int{400, 5, 6})) - expectEqualWithin(t, time.Second, 3, func() interface{} { return b.CountOutstanding() }) - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) + b.ReplaceQueue(makeTestWorkList([]int{4, 5, 6})) + for i := 1; i <= 3; i++ { + gate <- struct{}{} + expectEqualWithin(t, time.Second, 3-i, func() interface{} { return b.Status().Queued }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) + } + close(gate) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelEmpty(t, b.NextItem) } @@ -114,10 +121,10 @@ func TestWorkQueueReadWrite(t *testing.T) { var input = []int{1, 1, 2, 3, 5, 8, 13, 21, 34} b := NewWorkQueue() - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) b.ReplaceQueue(makeTestWorkList(input)) - expectCountQueued(t, b, len(input)) + expectQueued(t, b, len(input)) doWorkItems(t, b, input) expectChannelEmpty(t, b.NextItem) @@ -148,7 +155,7 @@ func TestWorkQueueEarlyRead(t *testing.T) { // finish. b.ReplaceQueue(makeTestWorkList(input)) <-done - expectCountQueued(t, b, 0) + expectQueued(t, b, 0) } // After Close(), NextItem closes, work finishes, then stats return zero. @@ -160,16 +167,16 @@ func TestWorkQueueClose(t *testing.T) { <-b.NextItem mark <- struct{}{} <-mark - b.ReportDone <- struct{}{} + b.DoneItem <- struct{}{} }() b.ReplaceQueue(makeTestWorkList(input)) // Wait for worker to take item 1 <-mark b.Close() - expectEqualWithin(t, time.Second, 1, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 1, func() interface{} { return b.Status().InProgress }) // Tell worker to report done mark <- struct{}{} - expectEqualWithin(t, time.Second, 0, func() interface{} { return b.CountOutstanding() }) + expectEqualWithin(t, time.Second, 0, func() interface{} { return b.Status().InProgress }) expectChannelClosedWithin(t, time.Second, b.NextItem) }